-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Limit size of shardDeleteResults #133558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Limit size of shardDeleteResults #133558
Changes from 16 commits
9870591
97e9969
24b7a62
d888113
ee89eb2
92991b9
3190772
a16856c
381d294
203d513
daf09b6
dc70d5b
0355c2a
f072128
abb2d4c
a0d728f
654ebf2
5ef0111
bd9217b
ba81bcf
acd2182
be05a1f
8d66c1e
ce64bf5
0fa5099
3575240
d55893d
3725a3c
ed00f1a
ce6195d
37404a5
fc41d60
d1e81f7
2f0ea30
6babba9
1ff464d
0d01264
d73ffef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,6 +76,7 @@ | |
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.unit.RelativeByteSizeValue; | ||
import org.elasticsearch.common.util.BigArrays; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
|
@@ -1006,7 +1007,8 @@ private void createSnapshotsDeletion( | |
SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds), | ||
originalRootBlobs, | ||
blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA), | ||
originalRepositoryData | ||
originalRepositoryData, | ||
metadata.settings() | ||
); | ||
})); | ||
} | ||
|
@@ -1075,6 +1077,7 @@ class SnapshotsDeletion { | |
* {@link RepositoryData} blob newer than the one identified by {@link #originalRepositoryDataGeneration}. | ||
*/ | ||
private final RepositoryData originalRepositoryData; | ||
private final Settings settings; | ||
|
||
/** | ||
* Executor to use for all repository interactions. | ||
|
@@ -1096,15 +1099,16 @@ class SnapshotsDeletion { | |
/** | ||
* Tracks the shard-level blobs which can be deleted once all the metadata updates have completed. | ||
*/ | ||
private final ShardBlobsToDelete shardBlobsToDelete = new ShardBlobsToDelete(); | ||
private final ShardBlobsToDelete shardBlobsToDelete; | ||
|
||
SnapshotsDeletion( | ||
Collection<SnapshotId> snapshotIds, | ||
long originalRepositoryDataGeneration, | ||
IndexVersion repositoryFormatIndexVersion, | ||
Map<String, BlobMetadata> originalRootBlobs, | ||
Map<String, BlobContainer> originalIndexContainers, | ||
RepositoryData originalRepositoryData | ||
RepositoryData originalRepositoryData, | ||
Settings settings | ||
) { | ||
this.snapshotIds = snapshotIds; | ||
this.originalRepositoryDataGeneration = originalRepositoryDataGeneration; | ||
|
@@ -1113,6 +1117,9 @@ class SnapshotsDeletion { | |
this.originalRootBlobs = originalRootBlobs; | ||
this.originalIndexContainers = originalIndexContainers; | ||
this.originalRepositoryData = originalRepositoryData; | ||
this.settings = settings; | ||
|
||
shardBlobsToDelete = new ShardBlobsToDelete(this.settings); | ||
} | ||
|
||
// --------------------------------------------------------------------------------------------------------------------------------- | ||
|
@@ -1477,6 +1484,7 @@ private void cleanupUnlinkedShardLevelBlobs(ActionListener<Void> listener) { | |
listener.onResponse(null); | ||
return; | ||
} | ||
|
||
snapshotExecutor.execute(ActionRunnable.wrap(listener, l -> { | ||
try { | ||
deleteFromContainer(OperationPurpose.SNAPSHOT_DATA, blobContainer(), filesToDelete); | ||
|
@@ -1666,6 +1674,7 @@ void writeTo(StreamOutput out) throws IOException { | |
} | ||
} | ||
|
||
private final int shardDeleteResultsMaxSize; | ||
/** | ||
* <p> | ||
* Shard-level results, i.e. a sequence of {@link ShardSnapshotMetaDeleteResult} objects, except serialized, concatenated, and | ||
|
@@ -1678,26 +1687,63 @@ void writeTo(StreamOutput out) throws IOException { | |
* need no further synchronization. | ||
* </p> | ||
*/ | ||
// If the size of this continues to be a problem even after compression, consider either a hard limit on its size (preferring leaked | ||
// blobs over an OOME on the master) or else offloading it to disk or to the repository itself. | ||
private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays); | ||
private final BytesStreamOutput shardDeleteResults; | ||
|
||
private int resultCount = 0; | ||
|
||
private final StreamOutput compressed = new OutputStreamStreamOutput( | ||
new BufferedOutputStream( | ||
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)), | ||
DeflateCompressor.BUFFER_SIZE | ||
) | ||
); | ||
private final StreamOutput compressed; | ||
|
||
private final ArrayList<Closeable> resources = new ArrayList<>(); | ||
|
||
private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder(); | ||
|
||
ShardBlobsToDelete() { | ||
resources.add(compressed); | ||
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults)); | ||
private final String SHARD_DELETE_RESULTS_MAX_SIZE_SETTING_NAME = "repositories.blobstore.max_shard_delete_results_size"; | ||
private final Setting<RelativeByteSizeValue> MAX_SHARD_DELETE_RESULTS_SIZE_SETTING = new Setting<>( | ||
SHARD_DELETE_RESULTS_MAX_SIZE_SETTING_NAME, | ||
"25%", | ||
s -> RelativeByteSizeValue.parseRelativeByteSizeValue(s, SHARD_DELETE_RESULTS_MAX_SIZE_SETTING_NAME), | ||
Setting.Property.NodeScope | ||
); | ||
|
||
ShardBlobsToDelete(Settings settings) { | ||
this.shardDeleteResultsMaxSize = calculateMaximumShardDeleteResultsSize(settings); | ||
if (this.shardDeleteResultsMaxSize > 0) { | ||
this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays); | ||
this.compressed = new OutputStreamStreamOutput( | ||
new BufferedOutputStream( | ||
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)), | ||
DeflateCompressor.BUFFER_SIZE | ||
) | ||
); | ||
resources.add(compressed); | ||
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults)); | ||
} else { | ||
this.shardDeleteResults = null; | ||
this.compressed = null; | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Calculates the maximum size of the shardDeleteResults BytesStreamOutput. | ||
* The size should at most be 2GB, but no more than 25% of the total remaining heap space | ||
* @return The maximum number of bytes the shardDeleteResults BytesStreamOutput can consume in the heap | ||
*/ | ||
int calculateMaximumShardDeleteResultsSize(Settings settings) { | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RelativeByteSizeValue configuredValue = MAX_SHARD_DELETE_RESULTS_SIZE_SETTING.get(settings); | ||
|
||
long maxAllowedSizeInBytes; | ||
if (configuredValue.isAbsolute()) { | ||
maxAllowedSizeInBytes = configuredValue.getAbsolute().getBytes(); | ||
} else { | ||
maxAllowedSizeInBytes = configuredValue.calculateValue(ByteSizeValue.ofBytes(Runtime.getRuntime().maxMemory()), null) | ||
.getBytes(); | ||
} | ||
|
||
if (maxAllowedSizeInBytes > Integer.MAX_VALUE) { | ||
return Integer.MAX_VALUE; | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
return (int) maxAllowedSizeInBytes; | ||
} | ||
|
||
synchronized void addShardDeleteResult( | ||
|
@@ -1706,10 +1752,33 @@ synchronized void addShardDeleteResult( | |
ShardGeneration newGeneration, | ||
Collection<String> blobsToDelete | ||
) { | ||
if (compressed == null) { | ||
// No output stream: skip writing, but still update generations | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
shardGenerationsBuilder.put(indexId, shardId, newGeneration); | ||
return; | ||
} | ||
try { | ||
shardGenerationsBuilder.put(indexId, shardId, newGeneration); | ||
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed); | ||
resultCount += 1; | ||
|
||
// Calculate how many bytes we want to write | ||
int bytesToWriteIndexId = StreamOutput.bytesInString(indexId.getId()); | ||
int bytesToWriteShardId = StreamOutput.bytesInVInt(shardId); | ||
int bytesToWriteBlobsToDelete = StreamOutput.bytesInStringCollection(blobsToDelete); | ||
int totalBytesRequired = bytesToWriteIndexId + bytesToWriteShardId + bytesToWriteBlobsToDelete; | ||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
// Only perform the write if there is capacity left | ||
if (shardDeleteResults.size() + totalBytesRequired <= shardDeleteResultsMaxSize) { | ||
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed); | ||
resultCount += 1; | ||
} else { | ||
logger.warn( | ||
"Failure to clean up the following dangling blobs, {}, for index {} and shard {}", | ||
blobsToDelete, | ||
indexId, | ||
shardId | ||
); | ||
|
||
compressed.close(); | ||
} | ||
} catch (IOException e) { | ||
assert false : e; // no IO actually happens here | ||
throw new UncheckedIOException(e); | ||
|
@@ -1721,6 +1790,10 @@ public ShardGenerations getUpdatedShardGenerations() { | |
} | ||
|
||
public Iterator<String> getBlobPaths() { | ||
if (compressed == null || shardDeleteResults == null) { | ||
// No output stream: nothing to return | ||
|
||
return Collections.emptyIterator(); | ||
} | ||
final StreamInput input; | ||
try { | ||
compressed.close(); | ||
|
@@ -1750,6 +1823,9 @@ public Iterator<String> getBlobPaths() { | |
|
||
@Override | ||
public void close() { | ||
if (resources.isEmpty()) { | ||
return; | ||
} | ||
|
||
try { | ||
IOUtils.close(resources); | ||
} catch (IOException e) { | ||
|
@@ -1760,7 +1836,7 @@ public void close() { | |
|
||
// exposed for tests | ||
int sizeInBytes() { | ||
return shardDeleteResults.size(); | ||
return shardDeleteResults == null ? 0 : shardDeleteResults.size(); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.common.io.stream; | ||
|
||
import org.elasticsearch.test.ESTestCase; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import static org.elasticsearch.common.io.stream.StreamOutput.bytesInString; | ||
import static org.elasticsearch.common.io.stream.StreamOutput.bytesInStringCollection; | ||
import static org.elasticsearch.common.io.stream.StreamOutput.bytesInVInt; | ||
|
||
public class StreamOutputTests extends ESTestCase { | ||
|
||
public void testBytesInVInt() { | ||
// 0..127 should use the single byte shortcut | ||
assertEquals(1, bytesInVInt(0)); | ||
assertEquals(1, bytesInVInt(127)); | ||
assertEquals(1, bytesInVInt(randomIntBetween(0, 127))); | ||
|
||
assertEquals(2, bytesInVInt(128)); | ||
assertEquals(2, bytesInVInt(16383)); | ||
assertEquals(2, bytesInVInt(randomIntBetween(128, 16383))); | ||
|
||
assertEquals(3, bytesInVInt(16384)); | ||
assertEquals(3, bytesInVInt(2097151)); | ||
assertEquals(3, bytesInVInt(randomIntBetween(16384, 2097151))); | ||
|
||
assertEquals(4, bytesInVInt(2097152)); | ||
assertEquals(4, bytesInVInt(268435455)); | ||
assertEquals(4, bytesInVInt(randomIntBetween(2097152, 268435455))); | ||
|
||
assertEquals(5, bytesInVInt(268435456)); | ||
assertEquals(5, bytesInVInt(Integer.MAX_VALUE)); | ||
assertEquals(5, bytesInVInt(randomIntBetween(268435456, Integer.MAX_VALUE))); | ||
} | ||
|
||
public void testBytesInString() { | ||
assertEquals(1, bytesInString("")); | ||
|
||
// Since the length of the string is stored as a VInt, this has additional bytes | ||
int randomSize = randomIntBetween(0, 256); | ||
int vintBytes = randomSize <= 127 ? 1 : 2; | ||
assertEquals(randomSize + vintBytes, bytesInString(randomAlphaOfLength(randomSize))); | ||
|
||
// This is U+00E9, and 2 bytes in UTF-8 | ||
String s = "é"; | ||
assertEquals(2 + 1, bytesInString(s)); | ||
|
||
// A mixed string of 1, 2 and 3 bytes respectively | ||
s = "aéअ"; | ||
assertEquals(6 + 1, bytesInString(s)); | ||
} | ||
|
||
public void testBytesInStringCollection() { | ||
assertEquals(1, bytesInStringCollection(Collections.emptyList())); | ||
|
||
List<String> listOfStrings = new ArrayList<>(); | ||
int randomSize = randomIntBetween(0, 256); | ||
// Start with the number of bytes needed to store the size of the collection | ||
int totalStringBytes = randomSize <= 127 ? 1 : 2; | ||
for (int i = 0; i < randomSize; i++) { | ||
int lengthOfString = randomIntBetween(0, 256); | ||
// The size of the string is stored as a VInt | ||
int stringBytes = lengthOfString <= 127 ? 1 : 2; | ||
StringBuilder s = new StringBuilder(); | ||
for (int j = 0; j < lengthOfString; j++) { | ||
int characterBytes = randomIntBetween(1, 3); | ||
if (characterBytes == 1) { | ||
s.append(randomAlphaOfLength(1)); | ||
} else if (characterBytes == 2) { | ||
s.append("é"); | ||
} else { | ||
s.append("अ"); | ||
} | ||
stringBytes += characterBytes; | ||
} | ||
|
||
listOfStrings.add(s.toString()); | ||
totalStringBytes += stringBytes; | ||
} | ||
|
||
assertEquals(totalStringBytes, bytesInStringCollection(listOfStrings)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this a dynamic setting too?