Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e8c27a5
Allow missing shard stats for restarted nodes for _snapshot/_status
JeremyDahlgren May 23, 2025
52909c7
[CI] Auto commit changes from spotless
May 23, 2025
d88eb20
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 23, 2025
6e41497
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 30, 2025
83bd1a9
Use -1 for missing stats values, address code review comments
JeremyDahlgren May 30, 2025
f3464b5
Update docs/changelog/128399.yaml
JeremyDahlgren May 30, 2025
b7414af
Merge branch 'main' into fix/es-10982
JeremyDahlgren May 31, 2025
a534a12
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 2, 2025
64217d4
Improve serialization tests, address code review comments
JeremyDahlgren Jun 3, 2025
1cd8dc8
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
a8b2d2e
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b2f4bf3
Update server/src/main/java/org/elasticsearch/action/admin/cluster/sn…
JeremyDahlgren Jun 3, 2025
b1fc83c
Fix fromXContent() bug, add @Nullable, update description string
JeremyDahlgren Jun 3, 2025
91a6442
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
3908def
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 3, 2025
b1db683
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
4f157ee
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 6, 2025
bd55889
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
e8eeed6
Merge branch 'main' into fix/es-10982
JeremyDahlgren Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128399.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128399
summary: Allow missing shard stats for restarted nodes for `_snapshot/_status`
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.GroupedActionListener;
Expand Down Expand Up @@ -212,7 +211,8 @@ public void testGetSnapshotsWithoutIndices() throws Exception {
* 1. Start snapshot of two shards (both located on separate data nodes).
* 2. Have one of the shards snapshot completely and the other block
* 3. Restart the data node that completed its shard snapshot
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes for non-restarted nodes
* 5. Make sure the description string is set for shard snapshots on restarted nodes.
*
* @throws Exception on failure
*/
Expand Down Expand Up @@ -248,21 +248,24 @@ public void testCorrectCountsForDoneShards() throws Exception {
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
}, 30L, TimeUnit.SECONDS);

final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
final int totalFiles = snapshotShardStats.getTotalFileCount();
final long totalFileSize = snapshotShardStats.getTotalSize();

internalCluster().restartNode(dataNodeTwo);

final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
getSnapshotStatus(repoName, snapshotOne),
indexTwo
);
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);

final var snapshotShardStateIndexTwo = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
assertThat(snapshotShardStateIndexTwo.getStage(), is(SnapshotIndexShardStage.DONE));
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateIndexTwo.getDescription());
final var missingStats = snapshotShardStateIndexTwo.getStats();
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
assertThat(missingStats.getTotalSize(), equalTo(-1L));

final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));

unblockAllDataNodes(repoName);
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ static TransportVersion def(int id) {
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES = def(9_087_0_00);
public static final TransportVersion JOIN_ON_ALIASES = def(9_088_0_00);
public static final TransportVersion ILM_ADD_SKIP_SETTING = def(9_089_0_00);
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_090_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand All @@ -21,6 +22,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {

private final SnapshotIndexShardStage stage;
Expand All @@ -31,12 +34,17 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements

private String failure;

private String description;

public SnapshotIndexShardStatus(StreamInput in) throws IOException {
super(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
description = in.readOptionalString();
}
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
Expand Down Expand Up @@ -74,11 +82,38 @@ public SnapshotIndexShardStatus(StreamInput in) throws IOException {
}

SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage, SnapshotStats stats, String nodeId, String failure) {
this(shardId, stage, stats, nodeId, failure, null);
}

SnapshotIndexShardStatus(
ShardId shardId,
SnapshotIndexShardStage stage,
SnapshotStats stats,
String nodeId,
String failure,
@Nullable String description
) {
super(shardId);
this.stage = stage;
this.stats = stats;
this.nodeId = nodeId;
this.failure = failure;
this.description = description;
}

/**
* Creates an instance for scenarios where the snapshot is {@link SnapshotIndexShardStage#DONE} but the stats are unavailable, with a
* non-null description of why the stats are missing.
*/
public static SnapshotIndexShardStatus forDoneButMissingStats(ShardId shardId, String description) {
return new SnapshotIndexShardStatus(
shardId,
SnapshotIndexShardStage.DONE,
SnapshotStats.forMissingStats(),
null,
null,
Objects.requireNonNull(description)
);
}

/**
Expand Down Expand Up @@ -109,19 +144,31 @@ public String getFailure() {
return failure;
}

/**
* Returns the optional description of the data values contained in the {@code stats} field.
*/
@Nullable
public String getDescription() {
return description;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(stage.value());
stats.writeTo(out);
out.writeOptionalString(nodeId);
out.writeOptionalString(failure);
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
out.writeOptionalString(description);
}
}

static final class Fields {
static final String STAGE = "stage";
static final String REASON = "reason";
static final String NODE = "node";
static final String DESCRIPTION = "description";
}

@Override
Expand All @@ -135,6 +182,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getFailure() != null) {
builder.field(Fields.REASON, getFailure());
}
if (getDescription() != null) {
builder.field(Fields.DESCRIPTION, getDescription());
}
builder.endObject();
return builder;
}
Expand All @@ -151,7 +201,8 @@ public boolean equals(Object o) {
return stage == that.stage
&& Objects.equals(stats, that.stats)
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(failure, that.failure);
&& Objects.equals(failure, that.failure)
&& Objects.equals(description, that.description);
}

@Override
Expand All @@ -160,6 +211,7 @@ public int hashCode() {
result = 31 * result + (stats != null ? stats.hashCode() : 0);
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
result = 31 * result + (description != null ? description.hashCode() : 0);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.IOException;

import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;

public class SnapshotStats implements Writeable, ToXContentObject {

private long startTime;
Expand All @@ -37,6 +39,19 @@ public class SnapshotStats implements Writeable, ToXContentObject {
SnapshotStats() {}

SnapshotStats(StreamInput in) throws IOException {
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
startTime = 0L;
time = 0L;
incrementalFileCount = -1;
processedFileCount = -1;
incrementalSize = -1L;
processedSize = -1L;
totalFileCount = -1;
totalSize = -1L;
return;
}

startTime = in.readVLong();
time = in.readVLong();

Expand Down Expand Up @@ -71,6 +86,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
this.processedSize = processedSize;
}

/**
* Returns a stats instance with invalid field values for use in situations where the snapshot stats are unavailable.
*/
public static SnapshotStats forMissingStats() {
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
}

/**
* Returns true if this instance is for a shard snapshot with unavailable stats.
*/
public boolean isMissingStats() {
return incrementalFileCount == -1;
}

/**
* Returns time when snapshot started
*/
Expand Down Expand Up @@ -129,6 +158,23 @@ public long getProcessedSize() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
if (isMissingStats()) {
out.writeBoolean(false);
return;
}
out.writeBoolean(true);
} else if (isMissingStats()) {
throw new IllegalStateException(
"cannot serialize empty stats for transport version ["
+ out.getTransportVersion()
+ "] less than ["
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
+ "]"
);
}

out.writeVLong(startTime);
out.writeVLong(time);

Expand Down Expand Up @@ -204,10 +250,10 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
long time = 0;
int incrementalFileCount = 0;
int totalFileCount = 0;
int processedFileCount = 0;
int processedFileCount = Integer.MIN_VALUE;
long incrementalSize = 0;
long totalSize = 0;
long processedSize = 0;
long processedSize = Long.MIN_VALUE;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
String currentName = parser.currentName();
Expand Down Expand Up @@ -282,6 +328,12 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
}
}
}
// Handle the case where the "processed" sub-object is omitted in toXContent() when processedFileCount == incrementalFileCount.
if (processedFileCount == Integer.MIN_VALUE) {
assert processedSize == Long.MIN_VALUE;
processedFileCount = incrementalFileCount;
processedSize = incrementalSize;
}
return new SnapshotStats(
startTime,
time,
Expand All @@ -300,6 +352,10 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
* @param updateTimestamps Whether or not start time and duration should be updated
*/
void add(SnapshotStats stats, boolean updateTimestamps) {
if (stats.isMissingStats()) {
return;
}

incrementalFileCount += stats.incrementalFileCount;
totalFileCount += stats.totalFileCount;
processedFileCount += stats.processedFileCount;
Expand Down
Loading
Loading