Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,7 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
}

/**
* Fetch and cache the full blob for the given cache entry from the remote repository if there
* are enough free pages in the cache to do so.
* Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
* <p>
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
* are run on the bulk executor.
Expand All @@ -502,67 +501,32 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
* and unlinked
*
* @param cacheKey the key to fetch data for
* @param length the length of the blob to fetch
* @param region the region of the blob to fetch
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
* @param writer a writer that handles writing of newly downloaded data to the shared cache
* @param fetchExecutor an executor to use for reading from the blob store
* @param listener listener that is called once all downloading has finished
* @return {@code true} if there were enough free pages to start downloading the full entry
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
* the region to write is already available in cache, if the region is pending fetching via another thread or if
* there is not enough free pages to fetch the region.
*/
public boolean maybeFetchFullEntry(
KeyType cacheKey,
long length,
RangeMissingHandler writer,
Executor fetchExecutor,
ActionListener<Void> listener
public void maybeFetchRegion(
final KeyType cacheKey,
final int region,
final long blobLength,
final RangeMissingHandler writer,
final Executor fetchExecutor,
final ActionListener<Boolean> listener
) {
int finalRegion = getEndingRegion(length);
// TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better?
if (freeRegionCount() < finalRegion) {
// Not enough room to download a full file without evicting existing data, so abort
listener.onResponse(null);
return false;
}
long regionLength = regionSize;
try (RefCountingListener refCountingListener = new RefCountingListener(listener)) {
for (int region = 0; region <= finalRegion; region++) {
if (region == finalRegion) {
regionLength = length - getRegionStart(region);
}
ByteRange rangeToWrite = ByteRange.of(0, regionLength);
if (rangeToWrite.isEmpty()) {
return true;
}
final ActionListener<Integer> regionListener = refCountingListener.acquire(ignored -> {});
final CacheFileRegion<KeyType> entry;
try {
entry = get(cacheKey, length, region);
} catch (AlreadyClosedException e) {
// failed to grab a cache page because some other operation concurrently acquired some
regionListener.onResponse(0);
return false;
}
// set read range == write range so the listener completes only once all the bytes have been downloaded
entry.populateAndRead(
rangeToWrite,
rangeToWrite,
(channel, pos, relativePos, len) -> Math.toIntExact(len),
writer,
fetchExecutor,
regionListener.delegateResponse((l, e) -> {
if (e instanceof AlreadyClosedException) {
l.onResponse(0);
} else {
l.onFailure(e);
}
})
);
}
}
return true;
fetchRegion(cacheKey, region, blobLength, writer, fetchExecutor, false, listener);
}

/**
* Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
* Fetch and write in cache a region of a blob.
* <p>
* If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
* </p>
*
* <p>
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
* are run on the bulk executor.
Expand All @@ -575,20 +539,23 @@ public boolean maybeFetchFullEntry(
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
* @param writer a writer that handles writing of newly downloaded data to the shared cache
* @param fetchExecutor an executor to use for reading from the blob store
* @param force flag indicating whether the cache should free an occupied region to accommodate the requested
* region when none are free.
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
* the region to write is already available in cache, if the region is pending fetching via another thread or if
* there is not enough free pages to fetch the region.
*/
public void maybeFetchRegion(
public void fetchRegion(
final KeyType cacheKey,
final int region,
final long blobLength,
final RangeMissingHandler writer,
final Executor fetchExecutor,
final boolean force,
final ActionListener<Boolean> listener
) {
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
// no free page available and no old enough unused region to be evicted
logger.info("No free regions, skipping loading region [{}]", region);
listener.onResponse(false);
Expand Down Expand Up @@ -636,7 +603,45 @@ public void maybeFetchRange(
final Executor fetchExecutor,
final ActionListener<Boolean> listener
) {
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
fetchRange(cacheKey, region, range, blobLength, writer, fetchExecutor, false, listener);
}

/**
* Fetch and write in cache a range within a blob region.
* <p>
* If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
* </p>
* <p>
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
* are run on the bulk executor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* are run on the bulk executor.
* are run on the fetch executor.

* <p>
* If an exception is thrown from the writer then the cache entry being downloaded is freed
* and unlinked
*
* @param cacheKey the key to fetch data for
* @param region the region of the blob
* @param range the range of the blob to fetch
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
* @param writer a writer that handles writing of newly downloaded data to the shared cache
* @param fetchExecutor an executor to use for reading from the blob store
* @param force flag indicating whether the cache should free an occupied region to accommodate the requested
* range when none are free.
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
* the range to write is already available in cache, if the range is pending fetching via another thread or if
* there is not enough free pages to fetch the range.
*/
public void fetchRange(
final KeyType cacheKey,
final int region,
final ByteRange range,
final long blobLength,
final RangeMissingHandler writer,
final Executor fetchExecutor,
final boolean force,
final ActionListener<Boolean> listener
) {
if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
// no free page available and no old enough unused region to be evicted
logger.info("No free regions, skipping loading region [{}]", region);
listener.onResponse(false);
Expand Down Expand Up @@ -723,8 +728,6 @@ private static void throwAlreadyClosed(String message) {

/**
* NOTE: Method is package private mostly to allow checking the number of fee regions in tests.
* However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try
* to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation.
*/
int freeRegionCount() {
return freeRegions.size();
Expand Down
Loading