Skip to content

Commit 7c3a9ce

Browse files
fcofdezmridula-s109
authored andcommitted
Add fetchRange and fetchRegion to SharedBlobCacheService (elastic#131000)
This commit adds two new methods to SharedBlobCacheService, fetchRange and fetchRegion. This new methods provide the same functionality as their maybe version, but the main difference is that they take a new boolean parameter (force) that allows the caller to specify if fetching the range/region should force an eviction if there are no free regions available.
1 parent 3e72289 commit 7c3a9ce

File tree

2 files changed

+455
-194
lines changed

2 files changed

+455
-194
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,7 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
492492
}
493493

494494
/**
495-
* Fetch and cache the full blob for the given cache entry from the remote repository if there
496-
* are enough free pages in the cache to do so.
495+
* Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
497496
* <p>
498497
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
499498
* are run on the bulk executor.
@@ -502,67 +501,32 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
502501
* and unlinked
503502
*
504503
* @param cacheKey the key to fetch data for
505-
* @param length the length of the blob to fetch
504+
* @param region the region of the blob to fetch
505+
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
506506
* @param writer a writer that handles writing of newly downloaded data to the shared cache
507507
* @param fetchExecutor an executor to use for reading from the blob store
508-
* @param listener listener that is called once all downloading has finished
509-
* @return {@code true} if there were enough free pages to start downloading the full entry
508+
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
509+
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
510+
* the region to write is already available in cache, if the region is pending fetching via another thread or if
511+
* there is not enough free pages to fetch the region.
510512
*/
511-
public boolean maybeFetchFullEntry(
512-
KeyType cacheKey,
513-
long length,
514-
RangeMissingHandler writer,
515-
Executor fetchExecutor,
516-
ActionListener<Void> listener
513+
public void maybeFetchRegion(
514+
final KeyType cacheKey,
515+
final int region,
516+
final long blobLength,
517+
final RangeMissingHandler writer,
518+
final Executor fetchExecutor,
519+
final ActionListener<Boolean> listener
517520
) {
518-
int finalRegion = getEndingRegion(length);
519-
// TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better?
520-
if (freeRegionCount() < finalRegion) {
521-
// Not enough room to download a full file without evicting existing data, so abort
522-
listener.onResponse(null);
523-
return false;
524-
}
525-
long regionLength = regionSize;
526-
try (RefCountingListener refCountingListener = new RefCountingListener(listener)) {
527-
for (int region = 0; region <= finalRegion; region++) {
528-
if (region == finalRegion) {
529-
regionLength = length - getRegionStart(region);
530-
}
531-
ByteRange rangeToWrite = ByteRange.of(0, regionLength);
532-
if (rangeToWrite.isEmpty()) {
533-
return true;
534-
}
535-
final ActionListener<Integer> regionListener = refCountingListener.acquire(ignored -> {});
536-
final CacheFileRegion<KeyType> entry;
537-
try {
538-
entry = get(cacheKey, length, region);
539-
} catch (AlreadyClosedException e) {
540-
// failed to grab a cache page because some other operation concurrently acquired some
541-
regionListener.onResponse(0);
542-
return false;
543-
}
544-
// set read range == write range so the listener completes only once all the bytes have been downloaded
545-
entry.populateAndRead(
546-
rangeToWrite,
547-
rangeToWrite,
548-
(channel, pos, relativePos, len) -> Math.toIntExact(len),
549-
writer,
550-
fetchExecutor,
551-
regionListener.delegateResponse((l, e) -> {
552-
if (e instanceof AlreadyClosedException) {
553-
l.onResponse(0);
554-
} else {
555-
l.onFailure(e);
556-
}
557-
})
558-
);
559-
}
560-
}
561-
return true;
521+
fetchRegion(cacheKey, region, blobLength, writer, fetchExecutor, false, listener);
562522
}
563523

564524
/**
565-
* Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
525+
* Fetch and write in cache a region of a blob.
526+
* <p>
527+
* If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
528+
* </p>
529+
*
566530
* <p>
567531
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
568532
* are run on the bulk executor.
@@ -575,20 +539,23 @@ public boolean maybeFetchFullEntry(
575539
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
576540
* @param writer a writer that handles writing of newly downloaded data to the shared cache
577541
* @param fetchExecutor an executor to use for reading from the blob store
542+
* @param force flag indicating whether the cache should free an occupied region to accommodate the requested
543+
* region when none are free.
578544
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
579545
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
580546
* the region to write is already available in cache, if the region is pending fetching via another thread or if
581547
* there is not enough free pages to fetch the region.
582548
*/
583-
public void maybeFetchRegion(
549+
public void fetchRegion(
584550
final KeyType cacheKey,
585551
final int region,
586552
final long blobLength,
587553
final RangeMissingHandler writer,
588554
final Executor fetchExecutor,
555+
final boolean force,
589556
final ActionListener<Boolean> listener
590557
) {
591-
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
558+
if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
592559
// no free page available and no old enough unused region to be evicted
593560
logger.info("No free regions, skipping loading region [{}]", region);
594561
listener.onResponse(false);
@@ -636,7 +603,45 @@ public void maybeFetchRange(
636603
final Executor fetchExecutor,
637604
final ActionListener<Boolean> listener
638605
) {
639-
if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
606+
fetchRange(cacheKey, region, range, blobLength, writer, fetchExecutor, false, listener);
607+
}
608+
609+
/**
610+
* Fetch and write in cache a range within a blob region.
611+
* <p>
612+
* If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
613+
* </p>
614+
* <p>
615+
* This method returns as soon as the download tasks are instantiated, but the tasks themselves
616+
* are run on the bulk executor.
617+
* <p>
618+
* If an exception is thrown from the writer then the cache entry being downloaded is freed
619+
* and unlinked
620+
*
621+
* @param cacheKey the key to fetch data for
622+
* @param region the region of the blob
623+
* @param range the range of the blob to fetch
624+
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
625+
* @param writer a writer that handles writing of newly downloaded data to the shared cache
626+
* @param fetchExecutor an executor to use for reading from the blob store
627+
* @param force flag indicating whether the cache should free an occupied region to accommodate the requested
628+
* range when none are free.
629+
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
630+
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
631+
* the range to write is already available in cache, if the range is pending fetching via another thread or if
632+
* there is not enough free pages to fetch the range.
633+
*/
634+
public void fetchRange(
635+
final KeyType cacheKey,
636+
final int region,
637+
final ByteRange range,
638+
final long blobLength,
639+
final RangeMissingHandler writer,
640+
final Executor fetchExecutor,
641+
final boolean force,
642+
final ActionListener<Boolean> listener
643+
) {
644+
if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) {
640645
// no free page available and no old enough unused region to be evicted
641646
logger.info("No free regions, skipping loading region [{}]", region);
642647
listener.onResponse(false);
@@ -723,8 +728,6 @@ private static void throwAlreadyClosed(String message) {
723728

724729
/**
725730
* NOTE: Method is package private mostly to allow checking the number of fee regions in tests.
726-
* However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try
727-
* to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation.
728731
*/
729732
int freeRegionCount() {
730733
return freeRegions.size();

0 commit comments

Comments
 (0)