diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 6e9cc114fa16a..4a202562e5e3d 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -660,20 +660,53 @@ public void maybeFetchRange( } } + /** + * Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store + * (NB: the relativePos parameter in + * {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)}) + * relative to the beginning of the region we're reading from. + * + * This is useful so that we can read the input stream we open for reading from the blob store + * from the beginning (i.e. position 0 in the input stream). + * + * For example, if we want to read 2000 bytes the blob store starting at position 1000, the writer here will + * adjust the relative position we read to be 0, the offset being 1000, and the input stream we open to + * read from the blob store will start streaming from position 1000 (but we adjusted the relative read position + * to 0 so we consume the input stream from the beginning). + */ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) { if (writeOffset == 0) { // no need to allocate a new capturing lambda if the offset isn't adjusted return writer; } - return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange( - channel, - channelPos, - streamFactory, - relativePos - writeOffset, - len, - progressUpdater, - completionListener - ); + + return new RangeMissingHandler() { + @Override + public void fillCacheRange( + SharedBytes.IO channel, + int channelPos, + SourceInputStreamFactory streamFactory, + int relativePos, + int length, + IntConsumer progressUpdater, + ActionListener completionListener + ) throws IOException { + writer.fillCacheRange( + channel, + channelPos, + streamFactory, + relativePos - writeOffset, + length, + progressUpdater, + completionListener + ); + } + + @Override + public SourceInputStreamFactory sharedInputStreamFactory(List gaps) { + return writer.sharedInputStreamFactory(gaps); + } + }; } // used by tests