@@ -492,8 +492,7 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
492492 }
493493
494494 /**
495- * Fetch and cache the full blob for the given cache entry from the remote repository if there
496- * are enough free pages in the cache to do so.
495+ * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
497496 * <p>
498497 * This method returns as soon as the download tasks are instantiated, but the tasks themselves
499498 * are run on the bulk executor.
@@ -502,67 +501,32 @@ CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
502501 * and unlinked
503502 *
504503 * @param cacheKey the key to fetch data for
505- * @param length the length of the blob to fetch
504+ * @param region the region of the blob to fetch
505+ * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
506506 * @param writer a writer that handles writing of newly downloaded data to the shared cache
507507 * @param fetchExecutor an executor to use for reading from the blob store
508- * @param listener listener that is called once all downloading has finished
509- * @return {@code true} if there were enough free pages to start downloading the full entry
508+ * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
509+ * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
510+ * the region to write is already available in cache, if the region is pending fetching via another thread or if
511+ * there is not enough free pages to fetch the region.
510512 */
511- public boolean maybeFetchFullEntry (
512- KeyType cacheKey ,
513- long length ,
514- RangeMissingHandler writer ,
515- Executor fetchExecutor ,
516- ActionListener <Void > listener
513+ public void maybeFetchRegion (
514+ final KeyType cacheKey ,
515+ final int region ,
516+ final long blobLength ,
517+ final RangeMissingHandler writer ,
518+ final Executor fetchExecutor ,
519+ final ActionListener <Boolean > listener
517520 ) {
518- int finalRegion = getEndingRegion (length );
519- // TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better?
520- if (freeRegionCount () < finalRegion ) {
521- // Not enough room to download a full file without evicting existing data, so abort
522- listener .onResponse (null );
523- return false ;
524- }
525- long regionLength = regionSize ;
526- try (RefCountingListener refCountingListener = new RefCountingListener (listener )) {
527- for (int region = 0 ; region <= finalRegion ; region ++) {
528- if (region == finalRegion ) {
529- regionLength = length - getRegionStart (region );
530- }
531- ByteRange rangeToWrite = ByteRange .of (0 , regionLength );
532- if (rangeToWrite .isEmpty ()) {
533- return true ;
534- }
535- final ActionListener <Integer > regionListener = refCountingListener .acquire (ignored -> {});
536- final CacheFileRegion <KeyType > entry ;
537- try {
538- entry = get (cacheKey , length , region );
539- } catch (AlreadyClosedException e ) {
540- // failed to grab a cache page because some other operation concurrently acquired some
541- regionListener .onResponse (0 );
542- return false ;
543- }
544- // set read range == write range so the listener completes only once all the bytes have been downloaded
545- entry .populateAndRead (
546- rangeToWrite ,
547- rangeToWrite ,
548- (channel , pos , relativePos , len ) -> Math .toIntExact (len ),
549- writer ,
550- fetchExecutor ,
551- regionListener .delegateResponse ((l , e ) -> {
552- if (e instanceof AlreadyClosedException ) {
553- l .onResponse (0 );
554- } else {
555- l .onFailure (e );
556- }
557- })
558- );
559- }
560- }
561- return true ;
521+ fetchRegion (cacheKey , region , blobLength , writer , fetchExecutor , false , listener );
562522 }
563523
564524 /**
565- * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so.
525+ * Fetch and write in cache a region of a blob.
526+ * <p>
527+ * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
528+ * </p>
529+ *
566530 * <p>
567531 * This method returns as soon as the download tasks are instantiated, but the tasks themselves
568532 * are run on the bulk executor.
@@ -575,20 +539,23 @@ public boolean maybeFetchFullEntry(
575539 * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
576540 * @param writer a writer that handles writing of newly downloaded data to the shared cache
577541 * @param fetchExecutor an executor to use for reading from the blob store
542+ * @param force flag indicating whether the cache should free an occupied region to accommodate the requested
543+ * region when none are free.
578544 * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
579545 * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
580546 * the region to write is already available in cache, if the region is pending fetching via another thread or if
581547 * there is not enough free pages to fetch the region.
582548 */
583- public void maybeFetchRegion (
549+ public void fetchRegion (
584550 final KeyType cacheKey ,
585551 final int region ,
586552 final long blobLength ,
587553 final RangeMissingHandler writer ,
588554 final Executor fetchExecutor ,
555+ final boolean force ,
589556 final ActionListener <Boolean > listener
590557 ) {
591- if (freeRegions .isEmpty () && maybeEvictLeastUsed () == false ) {
558+ if (force == false && freeRegions .isEmpty () && maybeEvictLeastUsed () == false ) {
592559 // no free page available and no old enough unused region to be evicted
593560 logger .info ("No free regions, skipping loading region [{}]" , region );
594561 listener .onResponse (false );
@@ -636,7 +603,45 @@ public void maybeFetchRange(
636603 final Executor fetchExecutor ,
637604 final ActionListener <Boolean > listener
638605 ) {
639- if (freeRegions .isEmpty () && maybeEvictLeastUsed () == false ) {
606+ fetchRange (cacheKey , region , range , blobLength , writer , fetchExecutor , false , listener );
607+ }
608+
609+ /**
610+ * Fetch and write in cache a range within a blob region.
611+ * <p>
612+ * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room.
613+ * </p>
614+ * <p>
615+ * This method returns as soon as the download tasks are instantiated, but the tasks themselves
616+ * are run on the bulk executor.
617+ * <p>
618+ * If an exception is thrown from the writer then the cache entry being downloaded is freed
619+ * and unlinked
620+ *
621+ * @param cacheKey the key to fetch data for
622+ * @param region the region of the blob
623+ * @param range the range of the blob to fetch
624+ * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
625+ * @param writer a writer that handles writing of newly downloaded data to the shared cache
626+ * @param fetchExecutor an executor to use for reading from the blob store
627+ * @param force flag indicating whether the cache should free an occupied region to accommodate the requested
628+ * range when none are free.
629+ * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
630+ * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
631+ * the range to write is already available in cache, if the range is pending fetching via another thread or if
632+ * there is not enough free pages to fetch the range.
633+ */
634+ public void fetchRange (
635+ final KeyType cacheKey ,
636+ final int region ,
637+ final ByteRange range ,
638+ final long blobLength ,
639+ final RangeMissingHandler writer ,
640+ final Executor fetchExecutor ,
641+ final boolean force ,
642+ final ActionListener <Boolean > listener
643+ ) {
644+ if (force == false && freeRegions .isEmpty () && maybeEvictLeastUsed () == false ) {
640645 // no free page available and no old enough unused region to be evicted
641646 logger .info ("No free regions, skipping loading region [{}]" , region );
642647 listener .onResponse (false );
@@ -723,8 +728,6 @@ private static void throwAlreadyClosed(String message) {
723728
724729 /**
725730 * NOTE: Method is package private mostly to allow checking the number of fee regions in tests.
726- * However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try
727- * to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation.
728731 */
729732 int freeRegionCount () {
730733 return freeRegions .size ();
0 commit comments