Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -660,20 +660,53 @@ public void maybeFetchRange(
}
}

/**
* Wraps the given {@link RangeMissingHandler} to adjust the position of the data read from the blob store
* (NB: the relativePos parameter in
* {@link RangeMissingHandler#fillCacheRange(SharedBytes.IO, int, SourceInputStreamFactory, int, int, IntConsumer, ActionListener)})
* relative to the beginning of the region we're reading from.
*
* This is useful so that we can read the input stream we open for reading from the blob store
* from the beginning (i.e. position 0 <b>in the input stream</b>).
*
* For example, if we want to read 2000 bytes the blob store starting at position 1000, the writer here will
* adjust the relative position we read to be 0, the offset being 1000, and the input stream we open to
* read from the blob store will start streaming from position 1000 (but we adjusted the relative read position
* to 0 so we consume the input stream from the beginning).
*/
private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) {
if (writeOffset == 0) {
// no need to allocate a new capturing lambda if the offset isn't adjusted
return writer;
}
return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange(
channel,
channelPos,
streamFactory,
relativePos - writeOffset,
len,
progressUpdater,
completionListener
);

return new RangeMissingHandler() {
@Override
public void fillCacheRange(
SharedBytes.IO channel,
int channelPos,
SourceInputStreamFactory streamFactory,
int relativePos,
int length,
IntConsumer progressUpdater,
ActionListener<Void> completionListener
) throws IOException {
writer.fillCacheRange(
channel,
channelPos,
streamFactory,
relativePos - writeOffset,
length,
progressUpdater,
completionListener
);
}

@Override
public SourceInputStreamFactory sharedInputStreamFactory(List<SparseFileTracker.Gap> gaps) {
return writer.sharedInputStreamFactory(gaps);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this works - since it does not apply the offset in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, would the relativePos - writeOffset parameter passed in fillCacheRange not do the trick here?
My understanding on sharedInputStreamFactory is that it has to do with how much we can parallelise the calls to fillCacheRange but the place we read from is the relativePos passed in (which will take the offset into account as it'll be relativePos - writeOffset?

We could return null after calling writer.sharedInputStreamFactory(gaps) here if you think it's better? i.e.:

writer.sharedInputStreamFactory(gaps);
return null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to return the shard input stream factory there.

Is there a test demonstrating that it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My link above still only overrides the fill cache method. I'll add one more test that calls maybeFetchRange with a real SequentialRangeMissingHandler instance. Thanks Henning!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen adding a unit test with SequentialRangeMissingHandler proven to be quite the mission (we'd end up mostly mocking things) so I added an integration test instead in https://github.com/elastic/elasticsearch-serverless/pull/3853

}
};
}

// used by tests
Expand Down