Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137210
summary: "[WIP] Introduce INDEX_SHARD_COUNT_FORMAT"
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_SETTINGS_VERSION = "settings_version";
static final String KEY_ALIASES_VERSION = "aliases_version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
public static final String KEY_SETTINGS = "settings";
static final String KEY_STATE = "state";
static final String KEY_MAPPINGS = "mappings";
static final String KEY_MAPPINGS_HASH = "mappings_hash";
Expand All @@ -596,7 +596,7 @@ public Iterator<Setting<?>> settings() {
static final String KEY_MAPPINGS_UPDATED_VERSION = "mappings_updated_version";
static final String KEY_SYSTEM = "system";
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String KEY_STATS = "stats";

Expand Down Expand Up @@ -2881,13 +2881,7 @@ public static IndexMetadata legacyFromXContent(XContentParser parser) throws IOE
} else if (token == XContentParser.Token.START_OBJECT) {
if ("settings".equals(currentFieldName)) {
Settings settings = Settings.fromXContent(parser);
if (SETTING_INDEX_VERSION_COMPATIBILITY.get(settings).isLegacyIndexVersion() == false) {
throw new IllegalStateException(
"this method should only be used to parse older incompatible index metadata versions "
+ "but got "
+ SETTING_INDEX_VERSION_COMPATIBILITY.get(settings).toReleaseVersion()
);
}
checkSettingIndexVersionCompatibility(settings);
builder.settings(settings);
} else if ("mappings".equals(currentFieldName)) {
Map<String, Object> mappingSourceBuilder = new HashMap<>();
Expand Down Expand Up @@ -2980,6 +2974,16 @@ private static void handleLegacyMapping(Builder builder, Map<String, Object> map
}
}

private static void checkSettingIndexVersionCompatibility(Settings settings) {
if (SETTING_INDEX_VERSION_COMPATIBILITY.get(settings).isLegacyIndexVersion() == false) {
throw new IllegalStateException(
"this method should only be used to parse older incompatible index metadata versions "
+ "but got "
+ SETTING_INDEX_VERSION_COMPATIBILITY.get(settings).toReleaseVersion()
);
}
}

/**
* Return the {@link IndexVersion} of Elasticsearch that has been used to create an index given its settings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,16 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Function.identity()
);

public static final ChecksumBlobStoreFormat<IndexShardCount> INDEX_SHARD_COUNT_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
(repoName, parser) -> IndexShardCount.fromIndexMetaData(parser),
(ignored) -> {
assert false;
throw new UnsupportedOperationException();
}
);

private static final String SNAPSHOT_CODEC = "snapshot";

public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>(
Expand Down Expand Up @@ -1327,13 +1337,13 @@ private void determineShardCount(ActionListener<Void> listener) {
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
);
} catch (Exception ex) {
logger.warn(() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
// Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards then the
// shard count from a subsequent indexMetaGeneration, or we might just not process these shards. If we skip these shards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I preferred "another metadata blob" here. "Subsequent" implies some kind of ordering but we're loading all this stuff in parallel, and might instead have got the shard count from an earlier blob. Also indexMetaGeneration is really the name of the blob rather than the blob itself, so metadata blob is more accurate.

// then the
// repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots that no
// longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair the state if
// the metadata read error is transient, but if not then the stale indices cleanup will eventually remove this index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.cluster.metadata.IndexMetadata.KEY_SETTINGS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;

/**
* A subset of {@link IndexMetadata} storing only the shard count of an index
* Prior to v9.3, the entire {@link IndexMetadata} object was stored in heap and then loaded during snapshotting to determine
* the shard count. As per ES-12539, this is replaced with the {@link IndexShardCount} class that writes and loads only the index's
* shard count to and from heap memory, reducing the possibility of smaller nodes going OOMe during snapshotting
*/
public record IndexShardCount(int count) {
/**
* Parses an {@link IndexMetadata} object, reading only the shard count and skipping the rest
* @param parser The parser of the {@link IndexMetadata} object
* @return Returns an {@link IndexShardCount} containing the shard count for the index
* @throws IOException Thrown if the {@link IndexMetadata} object cannot be parsed correctly
*/
public static IndexShardCount fromIndexMetaData(XContentParser parser) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: metadata is one word

Suggested change
public static IndexShardCount fromIndexMetaData(XContentParser parser) throws IOException {
public static IndexShardCount fromIndexMetadata(XContentParser parser) throws IOException {

if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this leniency is needed in IndexMetadata because it gets parsed from several different sources (at least, it probably used to, maybe not any more). Here we know we're reading from the blob in the snapshot repository so I think we can tighten this up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran BlobStoreRepositoryDeleteThrottlingTests here on debug mode and confirmed that both of these code paths are executed during the runtime of that test, and removing them fails the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but do we ever not run these branches?

XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName;
XContentParser.Token token = parser.nextToken();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

IndexShardCount indexShardCount = new IndexShardCount(-1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new dummy object here? We could use null to avoid the allocation. Or maybe we can return from this whole method early as soon as we've read the shard count? No need to consume this parser all the way to the end IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we do need to consume the parser to the end in order to satisfy this constraint, here:

result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);

My original solution returned early as an optimisation and threw an error for this reason.

However, I can instantiate the variable to null if you believe that to be better. By setting it to -1 I was protecting us from returning a null in the case of a malformed IndexMetadata object. If this now does occur, this code here:

try {
    updateShardCount(
        INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
    );
} catch (Exception ex) {
    logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
    // Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
    // shard count from another metadata blob, or we might just not process these shards. If we skip these shards
    // then the repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots
    // that no longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair
    // the state if the metadata read error is transient, but if not then the stale indices cleanup will eventually
    // remove this index and all its extra data anyway.
    // TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
}

will throw a NPE since null.count() isn't possible. The error will be caught and ignored, and we will move on to the next blob. I am happy with this behaviour, just wanted it documented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see ok thanks for checking. I'd rather we convert a null into a non-null placeholder value at the end of the method (since it should basically never occur absent corruption)

// Skip over everything except the settings object we care about, or any unexpected tokens
while ((currentFieldName = parser.nextFieldName()) != null) {
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
if (currentFieldName.equals(KEY_SETTINGS)) {
Settings settings = Settings.fromXContent(parser);
indexShardCount = new IndexShardCount(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we push the skipping behaviour down even further here? There's no need to load a whole Settings. It's not as big as the mappings but can still be sizeable (e.g. index.query.default_field could list thousands of field names).

} else {
// Iterate through the object, but we don't care for it's contents
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be performant to call parser.skipChildren() and avoid the while loops.

}
} else if (token == XContentParser.Token.START_ARRAY) {
// Iterate through the array, but we don't care for it's contents
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
}
} else if (token.isValue() == false) {
throw new IllegalArgumentException("Unexpected token " + token);
}
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
return indexShardCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void testIndexMetadataFromXContentParsingWithoutEventIngestedField() thro
IndexMetadata fromXContentMeta;
XContentParserConfiguration config = XContentParserConfiguration.EMPTY.withRegistry(xContentRegistry())
.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
try (XContentParser xContentParser = XContentHelper.mapToXContentParser(config, indexMetadataMap);) {
try (XContentParser xContentParser = XContentHelper.mapToXContentParser(config, indexMetadataMap)) {
fromXContentMeta = IndexMetadata.fromXContent(xContentParser);
}

Expand Down
Loading