Skip to content

Commit 2375e89

Browse files
authored
Make writerWithOffset fully delegate to the writer it wraps (#126937)
writerWithOffset uses a lambda to create a RangeMissingHandler however, the RangeMissingHandler interface has a default implementation for `sharedInputStreamFactory`. This makes `writerWithOffset` delegate to the received writer only for the `fillCacheRange` method where the writer itself perhaps didn't have the `sharedInputStream` method invoked (always invoking `sharedInputStream` before `fillCacheRange` is part of the contract of the RangeMissingHandler interface) This PR makes `writerWithOffset` delegate the `sharedInputStream` to the underlying writer.
1 parent dc2fbe1 commit 2375e89

File tree

1 file changed

+42
-9
lines changed

1 file changed

+42
-9
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -660,20 +660,53 @@ public void maybeFetchRange(
660660
}
661661
}
662662

663+
/**
664+
* Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store
665+
* (NB: the relativePos parameter in
666+
* {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)})
667+
* relative to the beginning of the region we're reading from.
668+
*
669+
* This is useful so that we can read the input stream we open for reading from the blob store
670+
* from the beginning (i.e. position 0 <b>in the input stream</b>).
671+
*
672+
* For example, if we want to read 2000 bytes the blob store starting at position 1000, the writer here will
673+
* adjust the relative position we read to be 0, the offset being 1000, and the input stream we open to
674+
* read from the blob store will start streaming from position 1000 (but we adjusted the relative read position
675+
* to 0 so we consume the input stream from the beginning).
676+
*/
663677
private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) {
664678
if (writeOffset == 0) {
665679
// no need to allocate a new capturing lambda if the offset isn't adjusted
666680
return writer;
667681
}
668-
return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange(
669-
channel,
670-
channelPos,
671-
streamFactory,
672-
relativePos - writeOffset,
673-
len,
674-
progressUpdater,
675-
completionListener
676-
);
682+
683+
return new RangeMissingHandler() {
684+
@Override
685+
public void fillCacheRange(
686+
SharedBytes.IO channel,
687+
int channelPos,
688+
SourceInputStreamFactory streamFactory,
689+
int relativePos,
690+
int length,
691+
IntConsumer progressUpdater,
692+
ActionListener<Void> completionListener
693+
) throws IOException {
694+
writer.fillCacheRange(
695+
channel,
696+
channelPos,
697+
streamFactory,
698+
relativePos - writeOffset,
699+
length,
700+
progressUpdater,
701+
completionListener
702+
);
703+
}
704+
705+
@Override
706+
public SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker.Gap> gaps) {
707+
return writer.sharedInputStreamFactory(gaps);
708+
}
709+
};
677710
}
678711

679712
// used by tests

0 commit comments

Comments
 (0)