Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/137210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 137210
summary: "Introduce INDEX_SHARD_COUNT_FORMAT"
area: Snapshot/Restore
type: bug
issues:
- 131822
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -227,6 +228,49 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
}
}

public void testSnapshotCreatedInOldVersionCanBeDeletedInNew() throws IOException {
final String repoName = getTestName();
try {
final int shards = 3;
final String index = "test-index";
createIndex(index, shards);
final IndexVersion minNodeVersion = minimumIndexVersion();
// 7.12.0+ will try to load RepositoryData during repo creation if verify is true, which is impossible in case of version
// incompatibility in the downgrade test step.
final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER
|| SnapshotsServiceUtils.includesUUIDs(minNodeVersion)
|| minNodeVersion.before(IndexVersions.V_7_12_0);
createRepository(repoName, false, verify);

// Create snapshots in the first step
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
int numberOfSnapshots = randomIntBetween(5, 10);
for (int i = 0; i < numberOfSnapshots; i++) {
createSnapshot(repoName, "snapshot-" + i, index);
}
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
assertSnapshotStatusSuccessful(repoName, snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
} else if (TEST_STEP == TestStep.STEP2_NEW_CLUSTER) {
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
List<String> snapshotNames = new ArrayList<>(snapshots.stream().map(sn -> (String) sn.get("snapshot")).toList());

// Delete a single snapshot
deleteSnapshot(repoName, snapshotNames.removeFirst());

// Delete a bulk number of snapshots, avoiding the case where we delete all snapshots since this invokes
// cleanup code and bulk snapshot deletion logic which is tested in testUpgradeMovesRepoToNewMetaVersion
final List<String> snapshotsToDeleteInBulk = randomSubsetOf(randomIntBetween(1, snapshotNames.size() - 1), snapshotNames);
deleteSnapshot(repoName, snapshotsToDeleteInBulk.toArray(String[]::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit convoluted to convert the list to an array only then to convert it again into a string with comma separators. Maybe make an overload that takes a List<String> and use org.elasticsearch.common.Strings#collectionToCommaDelimitedString to go directly from list to string.

snapshotNames.removeAll(snapshotsToDeleteInBulk);

// Delete the rest of the snapshots (will invoke bulk snapshot deletion logic)
deleteSnapshot(repoName, snapshotNames.toArray(String[]::new));
}
} finally {
deleteRepository(repoName);
}
}

private static void assertSnapshotStatusSuccessful(String repoName, String... snapshots) throws IOException {
Request statusReq = new Request("GET", "/_snapshot/" + repoName + "/" + String.join(",", snapshots) + "/_status");
ObjectPath statusResp = ObjectPath.createFromResponse(client().performRequest(statusReq));
Expand All @@ -235,8 +279,9 @@ private static void assertSnapshotStatusSuccessful(String repoName, String... sn
}
}

private void deleteSnapshot(String repoName, String name) throws IOException {
assertAcknowledged(client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + name)));
private void deleteSnapshot(String repoName, String... names) throws IOException {
String joinedNames = String.join(",", names);
assertAcknowledged(client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + joinedNames)));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_SETTINGS_VERSION = "settings_version";
static final String KEY_ALIASES_VERSION = "aliases_version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
public static final String KEY_SETTINGS = "settings";
static final String KEY_STATE = "state";
static final String KEY_MAPPINGS = "mappings";
static final String KEY_MAPPINGS_HASH = "mappings_hash";
Expand All @@ -596,7 +596,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_MAPPINGS_UPDATED_VERSION = "mappings_updated_version";
static final String KEY_SYSTEM = "system";
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String KEY_STATS = "stats";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Function.identity()
);

/**
* Parses only the shard count from the IndexMetadata object written by INDEX_METADATA_FORMAT (#131822)
*/
public static final ChecksumBlobStoreFormat<IndexShardCount> INDEX_SHARD_COUNT_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
(repoName, parser) -> IndexShardCount.fromIndexMetaData(parser),
(ignored) -> {
assert false;
throw new UnsupportedOperationException();
}
);

private static final String SNAPSHOT_CODEC = "snapshot";

public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>(
Expand Down Expand Up @@ -1327,17 +1340,16 @@ private void determineShardCount(ActionListener<Void> listener) {
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
);
} catch (Exception ex) {
logger.warn(() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
// Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards then the
// repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots that no
// longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair the state if
// the metadata read error is transient, but if not then the stale indices cleanup will eventually remove this index
// and all its extra data anyway.
// shard count from a subsequent indexMetaGeneration, or we might just not process these shards. If we skip these shards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I preferred "another metadata blob" here. "Subsequent" implies some kind of ordering but we're loading all this stuff in parallel, and might instead have got the shard count from an earlier blob. Also indexMetaGeneration is really the name of the blob rather than the blob itself, so metadata blob is more accurate.

// then the repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots
// that no longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair
// the state if the metadata read error is transient, but if not then the stale indices cleanup will eventually
// remove this index and all its extra data anyway.
// TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.cluster.metadata.IndexMetadata.KEY_SETTINGS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;

/**
* A subset of {@link IndexMetadata} storing only the shard count of an index
* Prior to v9.3, the entire {@link IndexMetadata} object was stored in heap and then loaded during snapshotting to determine
* the shard count. As per ES-12539, this is replaced with the {@link IndexShardCount} class that writes and loads only the index's
* shard count to and from heap memory, reducing the possibility of smaller nodes going OOMe during snapshotting
*/
public record IndexShardCount(int count) {
/**
* Parses an {@link IndexMetadata} object, reading only the shard count and skipping the rest
* @param parser The parser of the {@link IndexMetadata} object
* @return Returns an {@link IndexShardCount} containing the shard count for the index
* @throws IOException Thrown if the {@link IndexMetadata} object cannot be parsed correctly
*/
public static IndexShardCount fromIndexMetaData(XContentParser parser) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: metadata is one word

Suggested change
public static IndexShardCount fromIndexMetaData(XContentParser parser) throws IOException {
public static IndexShardCount fromIndexMetadata(XContentParser parser) throws IOException {

if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this leniency is needed in IndexMetadata because it gets parsed from several different sources (at least, it probably used to, maybe not any more). Here we know we're reading from the blob in the snapshot repository so I think we can tighten this up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran BlobStoreRepositoryDeleteThrottlingTests here on debug mode and confirmed that both of these code paths are executed during the runtime of that test, and removing them fails the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but do we ever not run these branches?

XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName;
XContentParser.Token token = parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

IndexShardCount indexShardCount = new IndexShardCount(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new dummy object here? We could use null to avoid the allocation. Or maybe we can return from this whole method early as soon as we've read the shard count? No need to consume this parser all the way to the end IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we do need to consume the parser to the end in order to satisfy this constraint, here:

result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);

My original solution returned early as an optimisation and threw an error for this reason.

However, I can instantiate the variable to null if you believe that to be better. By setting it to -1 I was protecting us from returning a null in the case of a malformed IndexMetadata object. If this now does occur, this code here:

try {
    updateShardCount(
        INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
    );
} catch (Exception ex) {
    logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
    // Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
    // shard count from another metadata blob, or we might just not process these shards. If we skip these shards
    // then the repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots
    // that no longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair
    // the state if the metadata read error is transient, but if not then the stale indices cleanup will eventually
    // remove this index and all its extra data anyway.
    // TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
}

will throw a NPE since null.count() isn't possible. The error will be caught and ignored, and we will move on to the next blob. I am happy with this behaviour, just wanted it documented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see ok thanks for checking. I'd rather we convert a null into a non-null placeholder value at the end of the method (since it should basically never occur absent corruption)

// Skip over everything except the settings object we care about, or any unexpected tokens
while ((currentFieldName = parser.nextFieldName()) != null) {
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
if (currentFieldName.equals(KEY_SETTINGS)) {
Settings settings = Settings.fromXContent(parser);
indexShardCount = new IndexShardCount(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we push the skipping behaviour down even further here? There's no need to load a whole Settings. It's not as big as the mappings but can still be sizeable (e.g. index.query.default_field could list thousands of field names).

} else {
// Iterate through the object, but we don't care for it's contents
parser.skipChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think we need a comment describing what skipChildren() does

}
} else if (token == XContentParser.Token.START_ARRAY) {
// Iterate through the array, but we don't care for it's contents
parser.skipChildren();
} else if (token.isValue() == false) {
throw new IllegalArgumentException("Unexpected token " + token);
}
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
return indexShardCount;
}
}
Loading