Skip to content

Commit 236c9fe

Browse files
Limit size of shardDeleteResults (#133558)
Modifies `BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults` to have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe. Relates to #131822 Closes ES-12540
1 parent d8ae9ae commit 236c9fe

File tree

7 files changed

+408
-69
lines changed

7 files changed

+408
-69
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
* it means that the "barrier to entry" for adding new methods to this class is relatively low even though it is a shared class with code
7373
* everywhere. That being said, this class deals primarily with {@code List}s rather than Arrays. For the most part calls should adapt to
7474
* lists, either by storing {@code List}s internally or just converting to and from a {@code List} when calling. This comment is repeated
75-
* on {@link StreamInput}.
75+
* on {@link StreamOutput}.
7676
*/
7777
public abstract class StreamInput extends InputStream {
7878

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.io.stream;
11+
12+
import java.io.FilterOutputStream;
13+
import java.io.IOException;
14+
import java.io.OutputStream;
15+
import java.util.function.IntSupplier;
16+
17+
/**
18+
* Truncates writes once the max size is exceeded.
19+
* However, when writing byte arrays, the stream does not check whether there is capacity for the full
20+
* array prior to writing, so there is overspill of up to b.length - 1.
21+
*/
22+
public class TruncatedOutputStream extends FilterOutputStream {
23+
private final IntSupplier currentSizeSupplier;
24+
private final int maxSize;
25+
private boolean hasCapacity = true;
26+
27+
public TruncatedOutputStream(OutputStream out, IntSupplier currentSizeSupplier, int maxSize) {
28+
super(out);
29+
this.currentSizeSupplier = currentSizeSupplier;
30+
this.maxSize = maxSize;
31+
}
32+
33+
/**
34+
* @return True if there is at least one byte of space left to write
35+
*/
36+
public boolean hasCapacity() {
37+
if (hasCapacity) {
38+
hasCapacity = currentSizeSupplier.getAsInt() < maxSize;
39+
}
40+
return hasCapacity;
41+
}
42+
43+
/**
44+
* If there is at least one byte of space left in the stream then write the byte
45+
* @param b The byte to write to the underlying stream
46+
* @throws IOException – if an I/O error occurs. In particular, an IOException
47+
* may be thrown if the output stream has been closed.
48+
*/
49+
@Override
50+
public void write(int b) throws IOException {
51+
if (hasCapacity()) {
52+
out.write(b);
53+
}
54+
}
55+
56+
/**
57+
* If there is at least one byte of space left in the stream then writes the byte stream.
58+
* Therefore, up to b.length - 1 bytes will overflow.
59+
* @param b The bytes to write to the underlying stream
60+
* @throws IOException – if an I/O error occurs. In particular, an IOException
61+
* may be thrown if the output stream has been closed.
62+
*/
63+
@Override
64+
public void write(byte[] b) throws IOException {
65+
if (hasCapacity()) {
66+
out.write(b);
67+
}
68+
}
69+
70+
/**
71+
* If there is at least one byte of space left in the stream then writes the byte stream.
72+
* Therefore, up to len - 1 bytes will overflow.
73+
* @param b The byte array to write from
74+
* @param off The index of the first byte to write.
75+
* @param len The number of bytes to write
76+
* @throws IOException – if an I/O error occurs. In particular, an IOException
77+
* may be thrown if the output stream has been closed.
78+
*/
79+
@Override
80+
public void write(byte[] b, int off, int len) throws IOException {
81+
if (hasCapacity()) {
82+
out.write(b, off, len);
83+
}
84+
}
85+
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
121121
import org.elasticsearch.plugins.PluginsService;
122122
import org.elasticsearch.readiness.ReadinessService;
123+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
123124
import org.elasticsearch.repositories.fs.FsRepository;
124125
import org.elasticsearch.rest.BaseRestHandler;
125126
import org.elasticsearch.script.ScriptService;
@@ -653,6 +654,7 @@ public void apply(Settings value, Settings current, Settings previous) {
653654
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
654655
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
655656
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL,
656-
SamplingService.TTL_POLL_INTERVAL_SETTING
657+
SamplingService.TTL_POLL_INTERVAL_SETTING,
658+
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING
657659
);
658660
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,13 @@
7171
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
7272
import org.elasticsearch.common.io.stream.StreamInput;
7373
import org.elasticsearch.common.io.stream.StreamOutput;
74+
import org.elasticsearch.common.io.stream.TruncatedOutputStream;
7475
import org.elasticsearch.common.lucene.Lucene;
7576
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
77+
import org.elasticsearch.common.settings.ClusterSettings;
7678
import org.elasticsearch.common.settings.Setting;
7779
import org.elasticsearch.common.settings.Settings;
80+
import org.elasticsearch.common.unit.ByteSizeUnit;
7881
import org.elasticsearch.common.unit.ByteSizeValue;
7982
import org.elasticsearch.common.util.BigArrays;
8083
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -432,6 +435,18 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
432435
Setting.Property.NodeScope
433436
);
434437

438+
/**
439+
* Defines the max size of the ShardBlobsToDelete.shard_delete_results stream as a percentage of available heap memory
440+
* This is a cluster level setting
441+
*/
442+
public static final Setting<ByteSizeValue> MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING = Setting.memorySizeSetting(
443+
"repositories.blobstore.max_heap_size_for_snapshot_deletion",
444+
"25%",
445+
Setting.Property.Dynamic,
446+
Setting.Property.NodeScope
447+
);
448+
private volatile int maxHeapSizeForSnapshotDeletion;
449+
435450
/**
436451
* Repository settings that can be updated dynamically without having to create a new repository.
437452
*/
@@ -546,6 +561,18 @@ protected BlobStoreRepository(
546561
threadPool.executor(ThreadPool.Names.SNAPSHOT)
547562
);
548563
this.blobStoreSnapshotMetrics = new BlobStoreSnapshotMetrics(projectId, metadata, snapshotMetrics);
564+
ClusterSettings clusterSettings = clusterService.getClusterSettings();
565+
clusterSettings.initializeAndWatch(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING, maxHeapSizeForSnapshotDeletion -> {
566+
/**
567+
* Calculates the maximum size of the ShardBlobsToDelete.shardDeleteResults BytesStreamOutput.
568+
* The size cannot exceed 2GB, without {@code BytesStreamOutput} throwing an IAE,
569+
* but should also be no more than 25% of the total remaining heap space.
570+
* A buffer of 1MB is maintained, so that even if the stream is of max size, there is room to flush
571+
*/
572+
this.maxHeapSizeForSnapshotDeletion = Math.toIntExact(
573+
Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1))
574+
);
575+
});
549576
}
550577

551578
@Override
@@ -1678,24 +1705,26 @@ void writeTo(StreamOutput out) throws IOException {
16781705
* need no further synchronization.
16791706
* </p>
16801707
*/
1681-
// If the size of this continues to be a problem even after compression, consider either a hard limit on its size (preferring leaked
1682-
// blobs over an OOME on the master) or else offloading it to disk or to the repository itself.
1683-
private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
1684-
1685-
private int resultCount = 0;
1686-
1687-
private final StreamOutput compressed = new OutputStreamStreamOutput(
1688-
new BufferedOutputStream(
1689-
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
1690-
DeflateCompressor.BUFFER_SIZE
1691-
)
1692-
);
1708+
private final BytesStreamOutput shardDeleteResults;
1709+
private final TruncatedOutputStream truncatedShardDeleteResultsOutputStream;
1710+
private final StreamOutput compressed;
16931711

1712+
private int resultsCount = 0;
1713+
private int leakedBlobsCount = 0;
16941714
private final ArrayList<Closeable> resources = new ArrayList<>();
1695-
16961715
private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();
16971716

16981717
ShardBlobsToDelete() {
1718+
this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
1719+
this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream(
1720+
new BufferedOutputStream(
1721+
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
1722+
DeflateCompressor.BUFFER_SIZE
1723+
),
1724+
shardDeleteResults::size,
1725+
maxHeapSizeForSnapshotDeletion
1726+
);
1727+
this.compressed = new OutputStreamStreamOutput(this.truncatedShardDeleteResultsOutputStream);
16991728
resources.add(compressed);
17001729
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
17011730
}
@@ -1708,14 +1737,37 @@ synchronized void addShardDeleteResult(
17081737
) {
17091738
try {
17101739
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
1711-
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
1712-
resultCount += 1;
1740+
// The write was truncated
1741+
if (writeBlobsIfCapacity(indexId, shardId, blobsToDelete) == false) {
1742+
logger.debug(
1743+
"Unable to clean up the following dangling blobs, {}, for index {} and shard {} "
1744+
+ "due to insufficient heap space on the master node.",
1745+
blobsToDelete,
1746+
indexId,
1747+
shardId
1748+
);
1749+
leakedBlobsCount += blobsToDelete.size();
1750+
}
17131751
} catch (IOException e) {
17141752
assert false : e; // no IO actually happens here
17151753
throw new UncheckedIOException(e);
17161754
}
17171755
}
17181756

1757+
private boolean writeBlobsIfCapacity(IndexId indexId, int shardId, Collection<String> blobsToDelete) throws IOException {
1758+
// There is a minimum of 1 byte available for writing
1759+
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
1760+
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
1761+
// We only want to read this shard delete result if we were able to write the entire object.
1762+
// Otherwise, for partial writes, an EOFException will be thrown upon reading
1763+
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
1764+
resultsCount += 1;
1765+
return true;
1766+
}
1767+
}
1768+
return false;
1769+
}
1770+
17191771
public ShardGenerations getUpdatedShardGenerations() {
17201772
return shardGenerationsBuilder.build();
17211773
}
@@ -1736,7 +1788,17 @@ public Iterator<String> getBlobPaths() {
17361788
throw new UncheckedIOException(e);
17371789
}
17381790

1739-
return Iterators.flatMap(Iterators.forRange(0, resultCount, i -> {
1791+
if (leakedBlobsCount > 0) {
1792+
logger.warn(
1793+
"Skipped cleanup of {} dangling snapshot blobs due to memory constraints on the master node. "
1794+
+ "These blobs will be cleaned up automatically by future snapshot deletions. "
1795+
+ "If you routinely delete large snapshots, consider increasing the master node's heap size "
1796+
+ "to allow for more efficient cleanup.",
1797+
leakedBlobsCount
1798+
);
1799+
}
1800+
1801+
return Iterators.flatMap(Iterators.forRange(0, resultsCount, i -> {
17401802
try {
17411803
return new ShardSnapshotMetaDeleteResult(input);
17421804
} catch (IOException e) {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.io.stream;
11+
12+
import org.elasticsearch.test.ESTestCase;
13+
14+
import java.io.ByteArrayOutputStream;
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
public class TruncatedOutputStreamTests extends ESTestCase {
20+
21+
public void testWriteSingleBytes() throws IOException {
22+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
23+
int maxSize = randomIntBetween(0, 100);
24+
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
25+
byteArrayOutputStream,
26+
byteArrayOutputStream::size,
27+
maxSize
28+
);
29+
30+
byte[] values = new byte[maxSize];
31+
32+
// Write enough bytes within the defined maxSize
33+
for (int i = 0; i < maxSize; i++) {
34+
byte b = randomByte();
35+
truncatedOutputStream.write(b);
36+
values[i] = b;
37+
}
38+
39+
// The stream should be truncated now that it is filled
40+
for (int i = 0; i < randomIntBetween(0, 20); i++) {
41+
truncatedOutputStream.write(randomByte());
42+
}
43+
44+
assertArrayEquals(values, byteArrayOutputStream.toByteArray());
45+
}
46+
47+
public void testWriteByteArray() throws IOException {
48+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
49+
int maxSize = randomIntBetween(100, 200);
50+
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
51+
byteArrayOutputStream,
52+
byteArrayOutputStream::size,
53+
maxSize
54+
);
55+
56+
List<Byte> values = new ArrayList<>();
57+
int bytesWritten = 0;
58+
// Write beyond the streams capacity
59+
while (bytesWritten <= maxSize * 2) {
60+
byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
61+
truncatedOutputStream.write(bytes);
62+
63+
// If there was capacity before writing, then the stream wrote the entire array
64+
// even if that meant overflowing
65+
if (bytesWritten < maxSize) {
66+
for (byte b : bytes) {
67+
values.add(b);
68+
}
69+
}
70+
71+
bytesWritten += bytes.length;
72+
}
73+
74+
byte[] valuesAsByteArray = new byte[values.size()];
75+
int i = 0;
76+
for (byte b : values) {
77+
valuesAsByteArray[i] = b;
78+
i++;
79+
}
80+
81+
assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
82+
}
83+
84+
public void testWriteByteArrayWithOffsetAndLength() throws IOException {
85+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
86+
int maxSize = randomIntBetween(100, 200);
87+
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
88+
byteArrayOutputStream,
89+
byteArrayOutputStream::size,
90+
maxSize
91+
);
92+
93+
List<Byte> values = new ArrayList<>();
94+
int bytesWritten = 0;
95+
// Write beyond the streams capacity
96+
while (bytesWritten <= maxSize * 2) {
97+
byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
98+
int offset = randomIntBetween(0, bytes.length);
99+
int length = randomIntBetween(0, bytes.length - offset);
100+
truncatedOutputStream.write(bytes, offset, length);
101+
102+
// If there was capacity before writing, then the stream wrote the sub array
103+
// even if that meant overflowing
104+
if (bytesWritten < maxSize) {
105+
for (int i = offset; i < offset + length; i++) {
106+
values.add(bytes[i]);
107+
}
108+
}
109+
110+
bytesWritten += length;
111+
}
112+
113+
byte[] valuesAsByteArray = new byte[values.size()];
114+
int i = 0;
115+
for (byte b : values) {
116+
valuesAsByteArray[i] = b;
117+
i++;
118+
}
119+
120+
assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
121+
}
122+
}

0 commit comments

Comments
 (0)