From 0c41d2936138393103cf290a8c1d5c8eadb745cd Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Wed, 16 Apr 2025 18:39:52 +0300 Subject: [PATCH 1/3] Make writerWithOffset fully delegate to the writer it wraps writerWithOffset uses a lambda to create a RangeMissingHandler however, the RangeMissingHandler interface has a default implementation for `sharedInputStreamFactory`. This makes `writerWithOffset` delegate to the received writer only for the `fillCacheRange` method where the writer itself perhaps didn't have the `sharedInputStream` method invoked (always invoking `sharedInputStream` before `fillCacheRange` is part of the contract of the RangeMissingHandler interface) This PR makes `writerWithOffset` delegate the `sharedInputStream` to the underlying writer. --- .../shared/SharedBlobCacheService.java | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index ed7965b85a36a..8c5880d3bace7 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -648,15 +648,34 @@ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int wri // no need to allocate a new capturing lambda if the offset isn't adjusted return writer; } - return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange( - channel, - channelPos, - streamFactory, - relativePos - writeOffset, - len, - progressUpdater, - completionListener - ); + + return new RangeMissingHandler() { + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + SourceInputStreamFactory streamFactory, + int relativePos, + int length, + IntConsumer progressUpdater, + ActionListener completionListener + ) throws IOException { + writer.fillCacheRange( + channel, + channelPos, + streamFactory, + relativePos - writeOffset, + length, + progressUpdater, + completionListener + ); + } + + @Override + public SourceInputStreamFactory sharedInputStreamFactory(List gaps) { + return writer.sharedInputStreamFactory(gaps); + } + }; } // used by tests From f89c72bf4bdf06e3ff63aca03222d02a539c5551 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Thu, 29 May 2025 14:30:04 +0100 Subject: [PATCH 2/3] add clarifying (hopefully) docs regarding the writerWithOffset --- .../blobcache/shared/SharedBlobCacheService.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 26923af4f03a3..199d2b58b90ea 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -660,6 +660,19 @@ public void maybeFetchRange( } } + /** + * Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store + * (NB: the relativePos parameter in {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)}) + * relative to the beginning of the region we're reading from. + * + * This is useful so that we can read the input stream we open for reading from the blob store + * from the beginning (i.e. position 0 in the input stream). + * + * For example, if we want to read 2000 bytes the blob store starting at position 1000, the writer here will + * adjust the relative position we read to be 0, the offset being 1000, and the input stream we open to + * read from the blob store will start streaming from position 1000 (but we adjusted the relative read position + * to 0 so we consume the input stream from the beginning). + */ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) { if (writeOffset == 0) { // no need to allocate a new capturing lambda if the offset isn't adjusted From 7a3c41259f703a593310e820523aa8c567e0266b Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Thu, 29 May 2025 14:42:54 +0100 Subject: [PATCH 3/3] line length --- .../elasticsearch/blobcache/shared/SharedBlobCacheService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 199d2b58b90ea..4a202562e5e3d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -662,7 +662,8 @@ public void maybeFetchRange( /** * Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store - * (NB: the relativePos parameter in {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)}) + * (NB: the relativePos parameter in + * {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)}) * relative to the beginning of the region we're reading from. * * This is useful so that we can read the input stream we open for reading from the blob store