Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9870591
Limit size of shardDeleteResults
joshua-adams-1 Aug 22, 2025
97e9969
[CI] Auto commit changes from spotless
Aug 26, 2025
24b7a62
Minor tweaks
joshua-adams-1 Aug 26, 2025
d888113
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Aug 26, 2025
ee89eb2
Ran ./gradlew spotlessApply precommit
joshua-adams-1 Aug 26, 2025
92991b9
TBR - Add TODO
joshua-adams-1 Aug 27, 2025
3190772
Uses a setting to control the max `shardDeleteResults` size
joshua-adams-1 Sep 1, 2025
a16856c
Remove TODOs
joshua-adams-1 Sep 1, 2025
381d294
Fix failing unit tests
joshua-adams-1 Sep 1, 2025
203d513
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 1, 2025
daf09b6
Moved the limit logic out of the streams submodule and into
joshua-adams-1 Sep 3, 2025
dc70d5b
Merge branch 'main' of github.com:elastic/elasticsearch into limit-sh…
joshua-adams-1 Sep 3, 2025
0355c2a
Add tests
joshua-adams-1 Sep 4, 2025
f072128
Run ./gradlew spotlessApply precommit
joshua-adams-1 Sep 4, 2025
abb2d4c
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 4, 2025
a0d728f
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 4, 2025
654ebf2
Creates BoundedOutputStream
joshua-adams-1 Sep 11, 2025
5ef0111
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 11, 2025
bd9217b
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 11, 2025
ba81bcf
[CI] Auto commit changes from spotless
Sep 11, 2025
acd2182
Revert StreamOutput and delete tests
joshua-adams-1 Sep 11, 2025
be05a1f
Adds BoundedOutputStreamTests
joshua-adams-1 Sep 11, 2025
8d66c1e
Creates TruncatedOutputStream
joshua-adams-1 Sep 12, 2025
ce64bf5
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 12, 2025
0fa5099
Spotless commit
joshua-adams-1 Sep 12, 2025
3575240
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 12, 2025
d55893d
Add skippedResultsCount
joshua-adams-1 Sep 12, 2025
3725a3c
Rewrite the unit tests
joshua-adams-1 Sep 18, 2025
ed00f1a
Spotless apply
joshua-adams-1 Sep 18, 2025
ce6195d
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 18, 2025
37404a5
[CI] Update transport version definitions
Oct 2, 2025
fc41d60
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Oct 2, 2025
d1e81f7
David comments
joshua-adams-1 Oct 7, 2025
2f0ea30
Fix test
joshua-adams-1 Oct 7, 2025
6babba9
Merge branch 'limit-shard-blobs-to-delete' of https://github.com/josh…
joshua-adams-1 Oct 7, 2025
1ff464d
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Oct 7, 2025
0d01264
spotless
joshua-adams-1 Oct 7, 2025
d73ffef
Modify comment
joshua-adams-1 Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class BytesStreamOutput extends BytesStream {
@Nullable
protected ByteArray bytes;
protected int count;
protected int maximumSize;

/**
* Create a non recycling {@link BytesStreamOutput} with an initial capacity of 0.
Expand All @@ -54,10 +55,15 @@ public BytesStreamOutput(int expectedSize) {
}

protected BytesStreamOutput(int expectedSize, BigArrays bigArrays) {
this(expectedSize, bigArrays, Integer.MAX_VALUE);
}

protected BytesStreamOutput(int expectedSize, BigArrays bigArrays, int maximumSize) {
this.bigArrays = bigArrays;
if (expectedSize != 0) {
this.bytes = bigArrays.newByteArray(expectedSize, false);
}
this.maximumSize = maximumSize;
}

@Override
Expand Down Expand Up @@ -171,8 +177,8 @@ private static void copyToArray(BytesReference bytesReference, byte[] arr) {
}

protected void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
if (offset > this.maximumSize) {
throw new IllegalArgumentException(getClass().getSimpleName() + " has exceeded it's max size of " + this.maximumSize);
}
if (bytes == null) {
this.bytes = bigArrays.newByteArray(BigArrays.overSize(offset, PageCacheRecycler.PAGE_SIZE_IN_BYTES, 1), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public ReleasableBytesStreamOutput(BigArrays bigarrays) {
this(PageCacheRecycler.PAGE_SIZE_IN_BYTES, bigarrays);
}

public ReleasableBytesStreamOutput(BigArrays bigArrays, int maximumSize) {
super(PageCacheRecycler.PAGE_SIZE_IN_BYTES, bigArrays, maximumSize);
}

public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -1477,6 +1480,7 @@ private void cleanupUnlinkedShardLevelBlobs(ActionListener<Void> listener) {
listener.onResponse(null);
return;
}

snapshotExecutor.execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(OperationPurpose.SNAPSHOT_DATA, blobContainer(), filesToDelete);
Expand Down Expand Up @@ -1678,26 +1682,66 @@ void writeTo(StreamOutput out) throws IOException {
* need no further synchronization.
* </p>
*/
// If the size of this continues to be a problem even after compression, consider either a hard limit on its size (preferring leaked
// blobs over an OOME on the master) or else offloading it to disk or to the repository itself.
private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
private final BytesStreamOutput shardDeleteResults;

private int resultCount = 0;

private final StreamOutput compressed = new OutputStreamStreamOutput(
new BufferedOutputStream(
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
DeflateCompressor.BUFFER_SIZE
)
);
private final StreamOutput compressed;

private final ArrayList<Closeable> resources = new ArrayList<>();

private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();

ShardBlobsToDelete() {
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
int maxSizeOfShardDeleteResults = calculateMaximumShardDeleteResultsSize();

if (maxSizeOfShardDeleteResults > 0) {
this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays, maxSizeOfShardDeleteResults);
this.compressed = new OutputStreamStreamOutput(
new BufferedOutputStream(
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
DeflateCompressor.BUFFER_SIZE
)
);
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
} else {
this.shardDeleteResults = null;
this.compressed = null;
}

}

MemoryMXBean getMemoryMXBean() {
return ManagementFactory.getMemoryMXBean();
}

/**
* Calculates the maximum size of the shardDeleteResults BytesStreamOutput
* The size should at most be 2GB, but no more than 25% of the total remaining heap space
* @return The maximum number of bytes the shardDeleteResults BytesStreamOutput can consume in the heap
*/
int calculateMaximumShardDeleteResultsSize() {
MemoryMXBean memoryBean = getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();

long heapUsageInBytes = heapUsage.getUsed();
long maxHeapUsageInBytes = heapUsage.getMax();

// Undefined heap
if (maxHeapUsageInBytes == -1) {
logger.warn("The heap is undefined when attempting to instantiate shardDeleteResults");
return 0;
}

long heapSpaceLeftInBytes = maxHeapUsageInBytes - heapUsageInBytes;
double maxPercentageHeapSpaceAllocation = 0.25;
long heapSpaceLeftToBeUtilisedInBytes = (long) Math.floor(heapSpaceLeftInBytes * maxPercentageHeapSpaceAllocation);
if (heapSpaceLeftToBeUtilisedInBytes > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int) heapSpaceLeftInBytes;
}
}

synchronized void addShardDeleteResult(
Expand All @@ -1706,6 +1750,11 @@ synchronized void addShardDeleteResult(
ShardGeneration newGeneration,
Collection<String> blobsToDelete
) {
if (compressed == null) {
// No output stream: skip writing, but still update generations
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
return;
}
try {
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
Expand All @@ -1714,13 +1763,18 @@ synchronized void addShardDeleteResult(
assert false : e; // no IO actually happens here
throw new UncheckedIOException(e);
}
// TODO - Catch the out of memory error
}

public ShardGenerations getUpdatedShardGenerations() {
return shardGenerationsBuilder.build();
}

public Iterator<String> getBlobPaths() {
if (compressed == null || shardDeleteResults == null) {
// No output stream: nothing to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here I don't think we should change anything with respect to these values being null.

return Collections.emptyIterator();
}
final StreamInput input;
try {
compressed.close();
Expand Down Expand Up @@ -1750,6 +1804,9 @@ public Iterator<String> getBlobPaths() {

@Override
public void close() {
if (resources.isEmpty()) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, let's always track these resources even if the limit is zero.

try {
IOUtils.close(resources);
} catch (IOException e) {
Expand All @@ -1760,7 +1817,7 @@ public void close() {

// exposed for tests
int sizeInBytes() {
return shardDeleteResults.size();
return shardDeleteResults == null ? 0 : shardDeleteResults.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testEmpty() throws Exception {
out.close();
}

public void testSingleByte() throws Exception {
public void testSingleByte() {
TestStreamOutput out = new TestStreamOutput();
assertEquals(0, out.size());

Expand All @@ -78,7 +79,7 @@ public void testSingleByte() throws Exception {
out.close();
}

public void testSingleShortPage() throws Exception {
public void testSingleShortPage() {
TestStreamOutput out = new TestStreamOutput();

int expectedSize = 10;
Expand All @@ -95,7 +96,7 @@ public void testSingleShortPage() throws Exception {
out.close();
}

public void testIllegalBulkWrite() throws Exception {
public void testIllegalBulkWrite() {
TestStreamOutput out = new TestStreamOutput();

// bulk-write with wrong args
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testSingleFullPageBulkWrite() throws Exception {
out.close();
}

public void testSingleFullPageBulkWriteWithOffset() throws Exception {
public void testSingleFullPageBulkWriteWithOffset() {
TestStreamOutput out = new TestStreamOutput();

int initialOffset = 10;
Expand All @@ -157,7 +158,7 @@ public void testSingleFullPageBulkWriteWithOffset() throws Exception {
out.close();
}

public void testSingleFullPageBulkWriteWithOffsetCrossover() throws Exception {
public void testSingleFullPageBulkWriteWithOffsetCrossover() {
TestStreamOutput out = new TestStreamOutput();

int initialOffset = 10;
Expand All @@ -176,7 +177,7 @@ public void testSingleFullPageBulkWriteWithOffsetCrossover() throws Exception {
out.close();
}

public void testSingleFullPage() throws Exception {
public void testSingleFullPage() {
TestStreamOutput out = new TestStreamOutput();

int expectedSize = PageCacheRecycler.BYTE_PAGE_SIZE;
Expand All @@ -193,7 +194,7 @@ public void testSingleFullPage() throws Exception {
out.close();
}

public void testOneFullOneShortPage() throws Exception {
public void testOneFullOneShortPage() {
TestStreamOutput out = new TestStreamOutput();

int expectedSize = PageCacheRecycler.BYTE_PAGE_SIZE + 10;
Expand All @@ -210,7 +211,7 @@ public void testOneFullOneShortPage() throws Exception {
out.close();
}

public void testTwoFullOneShortPage() throws Exception {
public void testTwoFullOneShortPage() {
TestStreamOutput out = new TestStreamOutput();

int expectedSize = (PageCacheRecycler.BYTE_PAGE_SIZE * 2) + 1;
Expand All @@ -227,8 +228,9 @@ public void testTwoFullOneShortPage() throws Exception {
out.close();
}

public void testSeek() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
public void testSeek() {
int maximumSize = randomIntBetween(0, Integer.MAX_VALUE);
BytesStreamOutput out = new BytesStreamOutput(0, BigArrays.NON_RECYCLING_INSTANCE, maximumSize);

int position = 0;
assertEquals(position, out.position());
Expand All @@ -241,13 +243,14 @@ public void testSeek() throws Exception {
assertEquals(position, BytesReference.toBytes(out.bytes()).length);

IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> out.seek(Integer.MAX_VALUE + 1L));
assertEquals("BytesStreamOutput cannot hold more than 2GB of data", iae.getMessage());
assertEquals("BytesStreamOutput has exceeded it's max size of " + maximumSize, iae.getMessage());

out.close();
}

public void testSkip() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();
public void testSkip() {
int maximumSize = randomIntBetween(0, Integer.MAX_VALUE);
BytesStreamOutput out = new BytesStreamOutput(0, BigArrays.NON_RECYCLING_INSTANCE, maximumSize);

int position = 0;
assertEquals(position, out.position());
Expand All @@ -256,8 +259,8 @@ public void testSkip() throws Exception {
out.skip(forward);
assertEquals(position + forward, out.position());

IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> out.skip(Integer.MAX_VALUE - 50));
assertEquals("BytesStreamOutput cannot hold more than 2GB of data", iae.getMessage());
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> out.skip(maximumSize - 50));
assertEquals("BytesStreamOutput has exceeded it's max size of " + maximumSize, iae.getMessage());

out.close();
}
Expand Down
Loading