Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137210
summary: "[WIP] Introduce INDEX_SHARD_COUNT_FORMAT"
area: Snapshot/Restore
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private static void assertPurposeConsistency(OperationPurpose purpose, String bl
startsWith(BlobStoreRepository.INDEX_FILE_PREFIX),
startsWith(BlobStoreRepository.METADATA_PREFIX),
startsWith(BlobStoreRepository.SNAPSHOT_PREFIX),
startsWith(BlobStoreRepository.SHARD_COUNT_PREFIX),
equalTo(BlobStoreRepository.INDEX_LATEST_BLOB),
// verification
equalTo("master.dat"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

/**
* Used to store the shard count for an index
* Prior to v9.3, the entire {@link IndexMetadata} object was stored in heap and then loaded during snapshotting to determine
* the shard count. As per ES-12539, this was replaced with {@link IndexShardCount} that tracks writes and loads only the index's
* shard count to and from heap memory. This not only reduces the likelihood of a node going OOMe, but increases snapshotting performance.
*/
public class IndexShardCount implements ToXContentFragment {
private static final String KEY_SHARD_COUNT = "shard_count";
private final int shardCount;

public IndexShardCount(int count) {
this.shardCount = count;
}

public int getCount() {
return shardCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(KEY_SHARD_COUNT, shardCount);
return builder;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private int count;

public Builder setCount(int count) {
this.count = count;
return this;
}

public IndexShardCount build() {
return new IndexShardCount(count);
}
}

public static IndexShardCount fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
XContentParser.Token currentToken = parser.nextToken();
IndexShardCount x;
if (currentToken.isValue()) {
x = new IndexShardCount(parser.intValue());
} else {
throw new IllegalArgumentException("Unexpected token " + currentToken);
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
return x;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexShardCount;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
Expand Down Expand Up @@ -240,6 +241,12 @@ private class ShutdownLogger {
*/
public static final String METADATA_PREFIX = "meta-";

/**
* Name prefix for the blobs that hold the global {@link IndexShardCount} for each index
* @see #SHARD_COUNT_NAME_FORMAT
*/
public static final String SHARD_COUNT_PREFIX = "shard-count-";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't really introduce a new blob here (it won't exist in any older repositories) but nor is there a need to do so. Instead, let's just read the existing meta-${UUID}.dat blob with a different reader that ignores everything except the shard count.


/**
* Name prefix for the blobs that hold top-level {@link SnapshotInfo} and shard-level {@link BlobStoreIndexShardSnapshot} metadata.
* @see #SNAPSHOT_NAME_FORMAT
Expand All @@ -260,6 +267,12 @@ private class ShutdownLogger {
*/
public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s" + METADATA_BLOB_NAME_SUFFIX;

/**
* Blob name format for global {@link IndexShardCount} blobs.
* @see #INDEX_SHARD_COUNT_FORMAT
*/
public static final String SHARD_COUNT_NAME_FORMAT = SHARD_COUNT_PREFIX + "%s" + METADATA_BLOB_NAME_SUFFIX;

/**
* Blob name format for top-level {@link SnapshotInfo} and shard-level {@link BlobStoreIndexShardSnapshot} blobs.
* @see #SNAPSHOT_FORMAT
Expand Down Expand Up @@ -398,6 +411,13 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Function.identity()
);

public static final ChecksumBlobStoreFormat<IndexShardCount> INDEX_SHARD_COUNT_FORMAT = new ChecksumBlobStoreFormat<>(
"index-shard-count",
SHARD_COUNT_NAME_FORMAT,
(repoName, parser) -> IndexShardCount.fromXContent(parser),
Function.identity()
);

private static final String SNAPSHOT_CODEC = "snapshot";

public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>(
Expand Down Expand Up @@ -1327,13 +1347,14 @@ private void determineShardCount(ActionListener<Void> listener) {
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getCount()
);
} catch (Exception ex) {
logger.warn(() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
// Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards then the
// shard count from a subsequent indexMetaGeneration, or we might just not process these shards. If we skip these shards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I preferred "another metadata blob" here. "Subsequent" implies some kind of ordering but we're loading all this stuff in parallel, and might instead have got the shard count from an earlier blob. Also indexMetaGeneration is really the name of the blob rather than the blob itself, so metadata blob is more accurate.

// then the
// repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots that no
// longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair the state if
// the metadata read error is transient, but if not then the stale indices cleanup will eventually remove this index
Expand Down Expand Up @@ -1904,21 +1925,25 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
}
}));

// Write the index metadata for each index in the snapshot
// Write the index metadata and the shard count to memory for each index in the snapshot, so that it persists
// even if the repository is deleted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"write to memory" is kinda odd phrasing - writes are supposed to be persistent here. Maybe this is leftover from an earlier change?

for (IndexId index : indices) {
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
final IndexMetadata indexMetaData = projectMetadata.index(index.getName());
final IndexShardCount indexShardCount = new IndexShardCount(indexMetaData.getNumberOfShards());
if (writeIndexGens) {
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
if (metaUUID == null) {
// We don't yet have this version of the metadata so we write it
metaUUID = UUIDs.base64UUID();
INDEX_SHARD_COUNT_FORMAT.write(indexShardCount, indexContainer(index), metaUUID, compress);
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
metadataWriteResult.indexMetaIdentifiers().put(identifiers, metaUUID);
} // else this task was largely a no-op - TODO no need to fork in that case
metadataWriteResult.indexMetas().put(index, identifiers);
} else {
INDEX_SHARD_COUNT_FORMAT.write(indexShardCount, indexContainer(index), snapshotId.getUUID(), compress);
INDEX_METADATA_FORMAT.write(
clusterMetadata.getProject(getProjectId()).index(index.getName()),
indexContainer(index),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ protected BlobContainer wrapChild(BlobContainer child) {
@Override
public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
final var pathParts = path().parts();
if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) {
if (pathParts.size() == 2
&& pathParts.get(0).equals("indices")
&& blobName.startsWith(BlobStoreRepository.SHARD_COUNT_PREFIX)) {
// reading index metadata, so mark index as active
assertTrue(activeIndices.add(pathParts.get(1)));
assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum RepositoryFileType {
SNAPSHOT_INFO("snap-UUID.dat"),
GLOBAL_METADATA("meta-UUID.dat"),
INDEX_METADATA("indices/UUID/meta-SHORTUUID.dat"),
INDEX_SHARD_COUNT("indices/UUID/shard-count-SHORTUUID.dat"),
SHARD_GENERATION("indices/UUID/NUM/index-UUID"),
SHARD_SNAPSHOT_INFO("indices/UUID/NUM/snap-UUID.dat"),
SHARD_DATA("indices/UUID/NUM/__UUID"),
Expand Down