Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/137210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137210
summary: "Introduce INDEX_SHARD_COUNT_FORMAT"
area: Snapshot/Restore
type: bug
issues:
- 131822
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -227,6 +228,49 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
}
}

public void testSnapshotCreatedInOldVersionCanBeDeletedInNew() throws IOException {
final String repoName = getTestName();
try {
final int shards = 3;
final String index = "test-index";
createIndex(index, shards);
final IndexVersion minNodeVersion = minimumIndexVersion();
// 7.12.0+ will try to load RepositoryData during repo creation if verify is true, which is impossible in case of version
// incompatibility in the downgrade test step.
final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER
|| SnapshotsServiceUtils.includesUUIDs(minNodeVersion)
|| minNodeVersion.before(IndexVersions.V_7_12_0);
createRepository(repoName, false, verify);

// Create snapshots in the first step
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
int numberOfSnapshots = randomIntBetween(5, 10);
for (int i = 0; i < numberOfSnapshots; i++) {
createSnapshot(repoName, "snapshot-" + i, index);
}
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
assertSnapshotStatusSuccessful(repoName, snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
} else if (TEST_STEP == TestStep.STEP2_NEW_CLUSTER) {
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
List<String> snapshotNames = new ArrayList<>(snapshots.stream().map(sn -> (String) sn.get("snapshot")).toList());

// Delete a single snapshot
deleteSnapshot(repoName, snapshotNames.removeFirst());

// Delete a bulk number of snapshots, avoiding the case where we delete all snapshots since this invokes
// cleanup code and bulk snapshot deletion logic which is tested in testUpgradeMovesRepoToNewMetaVersion
final List<String> snapshotsToDeleteInBulk = randomSubsetOf(randomIntBetween(1, snapshotNames.size() - 1), snapshotNames);
deleteSnapshot(repoName, snapshotsToDeleteInBulk);
snapshotNames.removeAll(snapshotsToDeleteInBulk);

// Delete the rest of the snapshots (will invoke bulk snapshot deletion logic)
deleteSnapshot(repoName, snapshotNames);
}
} finally {
deleteRepository(repoName);
}
}

private static void assertSnapshotStatusSuccessful(String repoName, String... snapshots) throws IOException {
Request statusReq = new Request("GET", "/_snapshot/" + repoName + "/" + String.join(",", snapshots) + "/_status");
ObjectPath statusResp = ObjectPath.createFromResponse(client().performRequest(statusReq));
Expand All @@ -235,6 +279,12 @@ private static void assertSnapshotStatusSuccessful(String repoName, String... sn
}
}

private void deleteSnapshot(String repoName, List<String> names) throws IOException {
assertAcknowledged(
client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + Strings.collectionToCommaDelimitedString(names)))
);
}

private void deleteSnapshot(String repoName, String name) throws IOException {
assertAcknowledged(client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + name)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_SETTINGS_VERSION = "settings_version";
static final String KEY_ALIASES_VERSION = "aliases_version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
public static final String KEY_SETTINGS = "settings";
static final String KEY_STATE = "state";
static final String KEY_MAPPINGS = "mappings";
static final String KEY_MAPPINGS_HASH = "mappings_hash";
Expand All @@ -596,7 +596,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_MAPPINGS_UPDATED_VERSION = "mappings_updated_version";
static final String KEY_SYSTEM = "system";
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String KEY_STATS = "stats";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Function.identity()
);

/**
* Parses only the shard count from the IndexMetadata object written by INDEX_METADATA_FORMAT (#131822)
*/
public static final ChecksumBlobStoreFormat<IndexShardCount> INDEX_SHARD_COUNT_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
(repoName, parser) -> IndexShardCount.fromIndexMetadata(parser),
(ignored) -> {
assert false;
throw new UnsupportedOperationException();
}
);

private static final String SNAPSHOT_CODEC = "snapshot";

public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>(
Expand Down Expand Up @@ -1327,17 +1340,16 @@ private void determineShardCount(ActionListener<Void> listener) {
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
);
} catch (Exception ex) {
logger.warn(() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
// Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards then the
// repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots that no
// longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair the state if
// the metadata read error is transient, but if not then the stale indices cleanup will eventually remove this index
// and all its extra data anyway.
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards
// then the repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots
// that no longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair
// the state if the metadata read error is transient, but if not then the stale indices cleanup will eventually
// remove this index and all its extra data anyway.
// TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.cluster.metadata.IndexMetadata.KEY_SETTINGS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;

/**
* A subset of {@link IndexMetadata} storing only the shard count of an index
* Prior to v9.3, the entire {@link IndexMetadata} object was stored in heap and then loaded during snapshotting to determine
* the shard count. As per ES-12539, this is replaced with the {@link IndexShardCount} class that writes and loads only the index's
* shard count to and from heap memory, reducing the possibility of smaller nodes going OOMe during snapshotting
*/
public record IndexShardCount(int count) {
/**
* Parses an {@link IndexMetadata} object, reading only the shard count and skipping the rest
* @param parser The parser of the {@link IndexMetadata} object
* @return Returns an {@link IndexShardCount} containing the shard count for the index
* @throws IOException Thrown if the {@link IndexMetadata} object cannot be parsed correctly
*/
public static IndexShardCount fromIndexMetadata(XContentParser parser) throws IOException {
parser.nextToken(); // fresh parser so move to the first token
parser.nextToken(); // on a start object move to next token
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName;
XContentParser.Token token = parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

IndexShardCount indexShardCount = null;
// Skip over everything except the index.number_of_shards setting, or any unexpected tokens
while ((currentFieldName = parser.nextFieldName()) != null) {
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
if (currentFieldName.equals(KEY_SETTINGS)) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (SETTING_NUMBER_OF_SHARDS.equals(fieldName)) {
indexShardCount = new IndexShardCount(parser.intValue());
} else {
parser.skipChildren();
}
}
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
} else if (token.isValue() == false) {
throw new IllegalArgumentException("Unexpected token " + token);
}
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);

// indexShardCount is null if corruption when parsing
return indexShardCount != null ? indexShardCount : new IndexShardCount(-1);
}
}
Loading