Skip to content

Commit 56f05de

Browse files
Introduce INDEX_SHARD_COUNT_FORMAT (#137210)
Replaces the existing `ChecksumBlobStoreFormat` `INDEX_METADATA_FORMAT` inside `BlobStoreRepository` with a new `INDEX_SHARD_COUNT_FORMAT`. This removes the need for writing an entire `IndexMetadata` object to, and reading from, heap memory. Instead, we read from the heap only the shard count for an index, reducing the total heap memory needed for snapshotting and reducing the likelihood a node will go OOMe. Closes ES-12538
1 parent eef5065 commit 56f05de

File tree

6 files changed

+411
-10
lines changed

6 files changed

+411
-10
lines changed

docs/changelog/137210.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 137210
2+
summary: "Introduce INDEX_SHARD_COUNT_FORMAT"
3+
area: Snapshot/Restore
4+
type: bug
5+
issues:
6+
- 131822

qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.io.InputStream;
2929
import java.net.HttpURLConnection;
30+
import java.util.ArrayList;
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.stream.Collectors;
@@ -227,6 +228,49 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
227228
}
228229
}
229230

231+
public void testSnapshotCreatedInOldVersionCanBeDeletedInNew() throws IOException {
232+
final String repoName = getTestName();
233+
try {
234+
final int shards = 3;
235+
final String index = "test-index";
236+
createIndex(index, shards);
237+
final IndexVersion minNodeVersion = minimumIndexVersion();
238+
// 7.12.0+ will try to load RepositoryData during repo creation if verify is true, which is impossible in case of version
239+
// incompatibility in the downgrade test step.
240+
final boolean verify = TEST_STEP != TestStep.STEP3_OLD_CLUSTER
241+
|| SnapshotsServiceUtils.includesUUIDs(minNodeVersion)
242+
|| minNodeVersion.before(IndexVersions.V_7_12_0);
243+
createRepository(repoName, false, verify);
244+
245+
// Create snapshots in the first step
246+
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
247+
int numberOfSnapshots = randomIntBetween(5, 10);
248+
for (int i = 0; i < numberOfSnapshots; i++) {
249+
createSnapshot(repoName, "snapshot-" + i, index);
250+
}
251+
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
252+
assertSnapshotStatusSuccessful(repoName, snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
253+
} else if (TEST_STEP == TestStep.STEP2_NEW_CLUSTER) {
254+
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
255+
List<String> snapshotNames = new ArrayList<>(snapshots.stream().map(sn -> (String) sn.get("snapshot")).toList());
256+
257+
// Delete a single snapshot
258+
deleteSnapshot(repoName, snapshotNames.removeFirst());
259+
260+
// Delete a bulk number of snapshots, avoiding the case where we delete all snapshots since this invokes
261+
// cleanup code and bulk snapshot deletion logic which is tested in testUpgradeMovesRepoToNewMetaVersion
262+
final List<String> snapshotsToDeleteInBulk = randomSubsetOf(randomIntBetween(1, snapshotNames.size() - 1), snapshotNames);
263+
deleteSnapshots(repoName, snapshotsToDeleteInBulk);
264+
snapshotNames.removeAll(snapshotsToDeleteInBulk);
265+
266+
// Delete the rest of the snapshots (will invoke bulk snapshot deletion logic)
267+
deleteSnapshots(repoName, snapshotNames);
268+
}
269+
} finally {
270+
deleteRepository(repoName);
271+
}
272+
}
273+
230274
private static void assertSnapshotStatusSuccessful(String repoName, String... snapshots) throws IOException {
231275
Request statusReq = new Request("GET", "/_snapshot/" + repoName + "/" + String.join(",", snapshots) + "/_status");
232276
ObjectPath statusResp = ObjectPath.createFromResponse(client().performRequest(statusReq));
@@ -235,6 +279,12 @@ private static void assertSnapshotStatusSuccessful(String repoName, String... sn
235279
}
236280
}
237281

282+
private void deleteSnapshots(String repoName, List<String> names) throws IOException {
283+
assertAcknowledged(
284+
client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + Strings.collectionToCommaDelimitedString(names)))
285+
);
286+
}
287+
238288
private void deleteSnapshot(String repoName, String name) throws IOException {
239289
assertAcknowledged(client().performRequest(new Request("DELETE", "/_snapshot/" + repoName + "/" + name)));
240290
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ public Iterator<Setting<?>> settings() {
587587
static final String KEY_SETTINGS_VERSION = "settings_version";
588588
static final String KEY_ALIASES_VERSION = "aliases_version";
589589
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
590-
static final String KEY_SETTINGS = "settings";
590+
public static final String KEY_SETTINGS = "settings";
591591
static final String KEY_STATE = "state";
592592
static final String KEY_MAPPINGS = "mappings";
593593
static final String KEY_MAPPINGS_HASH = "mappings_hash";
@@ -596,7 +596,7 @@ public Iterator<Setting<?>> settings() {
596596
static final String KEY_MAPPINGS_UPDATED_VERSION = "mappings_updated_version";
597597
static final String KEY_SYSTEM = "system";
598598
static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
599-
static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
599+
public static final String KEY_EVENT_INGESTED_RANGE = "event_ingested_range";
600600
public static final String KEY_PRIMARY_TERMS = "primary_terms";
601601
public static final String KEY_STATS = "stats";
602602

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,19 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
399399
Function.identity()
400400
);
401401

402+
/**
403+
* Parses only the shard count from the IndexMetadata object written by INDEX_METADATA_FORMAT (#131822)
404+
*/
405+
public static final ChecksumBlobStoreFormat<IndexShardCount> INDEX_SHARD_COUNT_FORMAT = new ChecksumBlobStoreFormat<>(
406+
"index-metadata",
407+
METADATA_NAME_FORMAT,
408+
(repoName, parser) -> IndexShardCount.fromIndexMetadata(parser),
409+
(ignored) -> {
410+
assert false;
411+
throw new UnsupportedOperationException();
412+
}
413+
);
414+
402415
private static final String SNAPSHOT_CODEC = "snapshot";
403416

404417
public static final ChecksumBlobStoreFormat<SnapshotInfo> SNAPSHOT_FORMAT = new ChecksumBlobStoreFormat<>(
@@ -1328,17 +1341,16 @@ private void determineShardCount(ActionListener<Void> listener) {
13281341
private void getOneShardCount(String indexMetaGeneration) {
13291342
try {
13301343
updateShardCount(
1331-
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
1332-
.getNumberOfShards()
1344+
INDEX_SHARD_COUNT_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry).count()
13331345
);
13341346
} catch (Exception ex) {
1335-
logger.warn(() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
1347+
logger.warn(() -> format("[%s] [%s] failed to read shard count for index", indexMetaGeneration, indexId.getName()), ex);
13361348
// Definitely indicates something fairly badly wrong with the repo, but not immediately fatal here: we might get the
1337-
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards then the
1338-
// repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots that no
1339-
// longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair the state if
1340-
// the metadata read error is transient, but if not then the stale indices cleanup will eventually remove this index
1341-
// and all its extra data anyway.
1349+
// shard count from another metadata blob, or we might just not process these shards. If we skip these shards
1350+
// then the repository will technically enter an invalid state (these shards' index-XXX blobs will refer to snapshots
1351+
// that no longer exist) and may contain dangling blobs too. A subsequent delete that hits this index may repair
1352+
// the state if the metadata read error is transient, but if not then the stale indices cleanup will eventually
1353+
// remove this index and all its extra data anyway.
13421354
// TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
13431355
}
13441356
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.blobstore;
11+
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.common.xcontent.XContentParserUtils;
14+
import org.elasticsearch.xcontent.XContentParser;
15+
16+
import java.io.IOException;
17+
18+
import static org.elasticsearch.cluster.metadata.IndexMetadata.KEY_SETTINGS;
19+
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
20+
21+
/**
22+
* A subset of {@link IndexMetadata} storing only the shard count of an index
23+
*/
24+
public record IndexShardCount(int count) {
25+
/**
26+
* Parses an {@link IndexMetadata} object, reading only the shard count and skipping the rest.
27+
* Assumes that the settings object is flat, and not nested.
28+
* @param parser The parser of the {@link IndexMetadata} object
29+
* @return Returns an {@link IndexShardCount} containing the shard count for the index
30+
* @throws IOException Thrown if the {@link IndexMetadata} object cannot be parsed correctly
31+
*/
32+
public static IndexShardCount fromIndexMetadata(XContentParser parser) throws IOException {
33+
parser.nextToken(); // fresh parser so move to the first token
34+
parser.nextToken(); // on a start object move to next token
35+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
36+
String currentFieldName;
37+
XContentParser.Token token = parser.nextToken();
38+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
39+
40+
IndexShardCount indexShardCount = null;
41+
// Skip over everything except the index.number_of_shards setting, or any unexpected tokens
42+
while ((currentFieldName = parser.nextFieldName()) != null) {
43+
token = parser.nextToken();
44+
if (token == XContentParser.Token.START_OBJECT) {
45+
if (currentFieldName.equals(KEY_SETTINGS)) {
46+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
47+
String fieldName = parser.currentName();
48+
parser.nextToken();
49+
if (SETTING_NUMBER_OF_SHARDS.equals(fieldName)) {
50+
assert indexShardCount == null : "number_of_shards setting encountered multiple times in index settings";
51+
indexShardCount = new IndexShardCount(parser.intValue());
52+
} else if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
53+
// Settings should be flat, not nested
54+
throw new IllegalArgumentException(
55+
"Settings object contains nested object for key [" + fieldName + "], expected flat settings map"
56+
);
57+
} else {
58+
parser.skipChildren();
59+
}
60+
}
61+
} else {
62+
parser.skipChildren();
63+
}
64+
} else if (token == XContentParser.Token.START_ARRAY) {
65+
parser.skipChildren();
66+
} else if (token.isValue() == false) {
67+
throw new IllegalArgumentException("Unexpected token " + token);
68+
}
69+
}
70+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
71+
72+
// indexShardCount is null if corruption when parsing
73+
return indexShardCount != null ? indexShardCount : new IndexShardCount(-1);
74+
}
75+
}

0 commit comments

Comments
 (0)