Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e8c27a5
Allow missing shard stats for restarted nodes for _snapshot/_status
JeremyDahlgren May 23, 2025
52909c7
[CI] Auto commit changes from spotless
May 23, 2025
d88eb20
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 23, 2025
6e41497
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 30, 2025
83bd1a9
Use -1 for missing stats values, address code review comments
JeremyDahlgren May 30, 2025
f3464b5
Update docs/changelog/128399.yaml
JeremyDahlgren May 30, 2025
b7414af
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 31, 2025
a534a12
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 2, 2025
64217d4
Improve serialization tests, address code review comments
JeremyDahlgren Jun 3, 2025
1cd8dc8
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
a8b2d2e
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b2f4bf3
Update server/src/main/java/org/elasticsearch/action/admin/cluster/sn…
JeremyDahlgren Jun 3, 2025
b1fc83c
Fix fromXContent() bug, add @Nullable, update description string
JeremyDahlgren Jun 3, 2025
91a6442
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
3908def
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b1db683
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
4f157ee
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
bd55889
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
e8eeed6
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public void testGetSnapshotsWithoutIndices() throws Exception {
* 1. Start snapshot of two shards (both located on separate data nodes).
* 2. Have one of the shards snapshot completely and the other block
* 3. Restart the data node that completed its shard snapshot
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes for non-restarted nodes
* 5. Make sure the description string is set for shard snapshots on restarted nodes.
*
* @throws Exception on failure
*/
Expand Down Expand Up @@ -261,8 +262,9 @@ public void testCorrectCountsForDoneShards() throws Exception {
indexTwo
);
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
assertNotNull("expected a description string for missing stats", snapshotShardStateAfterNodeRestart.getDescription());
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(0));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(0L));

unblockAllDataNodes(repoName);
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ static TransportVersion def(int id) {
public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_079_0_00);
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_RERANK_ADDED = def(9_080_0_00);
public static final TransportVersion SETTINGS_IN_DATA_STREAMS_DRY_RUN = def(9_081_0_00);
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED = def(9_082_0_00);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED;

public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {

private final SnapshotIndexShardStage stage;
Expand All @@ -31,12 +33,17 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements

private String failure;

private String description;

public SnapshotIndexShardStatus(StreamInput in) throws IOException {
super(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
description = in.readOptionalString();
}
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
Expand Down Expand Up @@ -74,11 +81,23 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage, SnapshotStats stats, String nodeId, String failure) {
this(shardId, stage, stats, nodeId, failure, null);
}

SnapshotIndexShardStatus(
ShardId shardId,
SnapshotIndexShardStage stage,
SnapshotStats stats,
String nodeId,
String failure,
String description
) {
super(shardId);
this.stage = stage;
this.stats = stats;
this.nodeId = nodeId;
this.failure = failure;
this.description = description;
}

/**
Expand Down Expand Up @@ -109,19 +128,30 @@ public String getFailure() {
return failure;
}

/**
* Returns the optional description of the data values contained in the {@code stats} field.
*/
public String getDescription() {
return description;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(stage.value());
stats.writeTo(out);
out.writeOptionalString(nodeId);
out.writeOptionalString(failure);
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_DESCRIPTION_ADDED)) {
out.writeOptionalString(description);
}
}

static final class Fields {
static final String STAGE = "stage";
static final String REASON = "reason";
static final String NODE = "node";
static final String DESCRIPTION = "description";
}

@Override
Expand All @@ -135,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getFailure() != null) {
builder.field(Fields.REASON, getFailure());
}
if (getDescription() != null) {
builder.field(Fields.DESCRIPTION, getDescription());
}
builder.endObject();
return builder;
}
Expand All @@ -151,7 +184,8 @@ public boolean equals(Object o) {
return stage == that.stage
&& Objects.equals(stats, that.stats)
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(failure, that.failure);
&& Objects.equals(failure, that.failure)
&& Objects.equals(description, that.description);
}

@Override
Expand All @@ -160,6 +194,7 @@ public int hashCode() {
result = 31 * result + (stats != null ? stats.hashCode() : 0);
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() : 0);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ void buildResponse(
}
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
if (status.nodeId() != null) {
// We should have information about this shard from the shard:
// We should have information about this shard from the node:
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
if (nodeStatus != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshot());
if (shardStatues != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatuses = nodeStatus.status().get(entry.snapshot());
if (shardStatuses != null) {
final ShardId sid = entry.shardId(shardEntry.getKey());
SnapshotIndexShardStatus shardStatus = shardStatues.get(sid);
SnapshotIndexShardStatus shardStatus = shardStatuses.get(sid);
if (shardStatus != null) {
// We have full information about this shard
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
Expand All @@ -228,26 +228,25 @@ void buildResponse(
}
// We failed to find the status of the shard from the responses we received from data nodes.
// This can happen if nodes drop out of the cluster completely or restart during the snapshot.
// We rebuild the information they would have provided from their in memory state from the cluster
// state and the repository contents in the below logic
final SnapshotIndexShardStage stage = switch (shardEntry.getValue().state()) {
case FAILED, ABORTED, MISSING -> SnapshotIndexShardStage.FAILURE;
case INIT, WAITING, PAUSED_FOR_NODE_REMOVAL, QUEUED -> SnapshotIndexShardStage.STARTED;
case SUCCESS -> SnapshotIndexShardStage.DONE;
};
final SnapshotIndexShardStatus shardStatus;
if (stage == SnapshotIndexShardStage.DONE) {
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
// shard from the repository already.
final ShardId shardId = entry.shardId(shardEntry.getKey());
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
// that can be used to load them.
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
.getShardSnapshotStatus(
entry.snapshot().getSnapshotId(),
entry.indices().get(shardId.getIndexName()),
shardId
)
entry.shardId(shardEntry.getKey()),
stage,
new SnapshotStats(),
null,
null,
"""
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
completing the snapshot; use /_snapshot/<repository>/<snapshot>/_status to load from the repository."""
);
} else {
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ protected SnapshotIndexShardStatus createForIndex(String indexName) {
SnapshotStats stats = new SnapshotStatsTests().createTestInstance();
String nodeId = randomAlphaOfLength(20);
String failure = null;
String description = null;
if (rarely()) {
failure = randomAlphaOfLength(200);
} else if (rarely()) {
description = randomAlphaOfLength(200);
}
return new SnapshotIndexShardStatus(shardId, stage, stats, nodeId, failure);
return new SnapshotIndexShardStatus(shardId, stage, stats, nodeId, failure, description);
}

@Override
Expand Down Expand Up @@ -76,6 +79,7 @@ protected boolean supportsUnknownFields() {
String rawStage = (String) parsedObjects[i++];
String nodeId = (String) parsedObjects[i++];
String failure = (String) parsedObjects[i++];
String description = (String) parsedObjects[i++];
SnapshotStats stats = (SnapshotStats) parsedObjects[i];

SnapshotIndexShardStage stage;
Expand All @@ -89,12 +93,13 @@ protected boolean supportsUnknownFields() {
rawStage
);
}
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure);
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure, description);
}
);
innerParser.declareString(constructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.STAGE));
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.NODE));
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.REASON));
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.DESCRIPTION));
innerParser.declareObject(constructorArg(), (p, c) -> SnapshotStats.fromXContent(p), new ParseField(SnapshotStats.Fields.STATS));
PARSER = (p, indexId, shardName) -> {
// Combine the index name in the context with the shard name passed in for the named object parser
Expand Down
Loading