From df786193cc1c891660036156c4ce009bac9bc72b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 10 Jul 2025 11:25:23 +0200 Subject: [PATCH] Add fetchRange and fetchRegion to SharedBlobCacheService This commit adds two new methods to SharedBlobCacheService, fetchRange and fetchRegion. This new methods provide the same functionality as their maybe version, but the main difference is that they take a new boolean parameter (force) that allows the caller to specify if fetching the range/region should force an eviction if there are no free regions available. --- .../shared/SharedBlobCacheService.java | 125 +++-- .../shared/SharedBlobCacheServiceTests.java | 524 +++++++++++++----- 2 files changed, 455 insertions(+), 194 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 4a202562e5e3d..3fc66d504a76e 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -492,8 +492,7 @@ CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { } /** - * Fetch and cache the full blob for the given cache entry from the remote repository if there - * are enough free pages in the cache to do so. + * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so. *

* This method returns as soon as the download tasks are instantiated, but the tasks themselves * are run on the bulk executor. @@ -502,67 +501,32 @@ CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { * and unlinked * * @param cacheKey the key to fetch data for - * @param length the length of the blob to fetch + * @param region the region of the blob to fetch + * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) * @param writer a writer that handles writing of newly downloaded data to the shared cache * @param fetchExecutor an executor to use for reading from the blob store - * @param listener listener that is called once all downloading has finished - * @return {@code true} if there were enough free pages to start downloading the full entry + * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in + * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if + * the region to write is already available in cache, if the region is pending fetching via another thread or if + * there is not enough free pages to fetch the region. */ - public boolean maybeFetchFullEntry( - KeyType cacheKey, - long length, - RangeMissingHandler writer, - Executor fetchExecutor, - ActionListener listener + public void maybeFetchRegion( + final KeyType cacheKey, + final int region, + final long blobLength, + final RangeMissingHandler writer, + final Executor fetchExecutor, + final ActionListener listener ) { - int finalRegion = getEndingRegion(length); - // TODO freeRegionCount uses freeRegions.size() which is is NOT a constant-time operation. Can we do better? - if (freeRegionCount() < finalRegion) { - // Not enough room to download a full file without evicting existing data, so abort - listener.onResponse(null); - return false; - } - long regionLength = regionSize; - try (RefCountingListener refCountingListener = new RefCountingListener(listener)) { - for (int region = 0; region <= finalRegion; region++) { - if (region == finalRegion) { - regionLength = length - getRegionStart(region); - } - ByteRange rangeToWrite = ByteRange.of(0, regionLength); - if (rangeToWrite.isEmpty()) { - return true; - } - final ActionListener regionListener = refCountingListener.acquire(ignored -> {}); - final CacheFileRegion entry; - try { - entry = get(cacheKey, length, region); - } catch (AlreadyClosedException e) { - // failed to grab a cache page because some other operation concurrently acquired some - regionListener.onResponse(0); - return false; - } - // set read range == write range so the listener completes only once all the bytes have been downloaded - entry.populateAndRead( - rangeToWrite, - rangeToWrite, - (channel, pos, relativePos, len) -> Math.toIntExact(len), - writer, - fetchExecutor, - regionListener.delegateResponse((l, e) -> { - if (e instanceof AlreadyClosedException) { - l.onResponse(0); - } else { - l.onFailure(e); - } - }) - ); - } - } - return true; + fetchRegion(cacheKey, region, blobLength, writer, fetchExecutor, false, listener); } /** - * Fetch and write in cache a region of a blob if there are enough free pages in the cache to do so. + * Fetch and write in cache a region of a blob. + *

+ * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room. + *

+ * *

* This method returns as soon as the download tasks are instantiated, but the tasks themselves * are run on the bulk executor. @@ -575,20 +539,23 @@ public boolean maybeFetchFullEntry( * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) * @param writer a writer that handles writing of newly downloaded data to the shared cache * @param fetchExecutor an executor to use for reading from the blob store + * @param force flag indicating whether the cache should free an occupied region to accommodate the requested + * region when none are free. * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if * the region to write is already available in cache, if the region is pending fetching via another thread or if * there is not enough free pages to fetch the region. */ - public void maybeFetchRegion( + public void fetchRegion( final KeyType cacheKey, final int region, final long blobLength, final RangeMissingHandler writer, final Executor fetchExecutor, + final boolean force, final ActionListener listener ) { - if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) { + if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) { // no free page available and no old enough unused region to be evicted logger.info("No free regions, skipping loading region [{}]", region); listener.onResponse(false); @@ -636,7 +603,45 @@ public void maybeFetchRange( final Executor fetchExecutor, final ActionListener listener ) { - if (freeRegions.isEmpty() && maybeEvictLeastUsed() == false) { + fetchRange(cacheKey, region, range, blobLength, writer, fetchExecutor, false, listener); + } + + /** + * Fetch and write in cache a range within a blob region. + *

+ * If {@code force} is {@code true} and no free regions remain, an existing region will be evicted to make room. + *

+ *

+ * This method returns as soon as the download tasks are instantiated, but the tasks themselves + * are run on the bulk executor. + *

+ * If an exception is thrown from the writer then the cache entry being downloaded is freed + * and unlinked + * + * @param cacheKey the key to fetch data for + * @param region the region of the blob + * @param range the range of the blob to fetch + * @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region) + * @param writer a writer that handles writing of newly downloaded data to the shared cache + * @param fetchExecutor an executor to use for reading from the blob store + * @param force flag indicating whether the cache should free an occupied region to accommodate the requested + * range when none are free. + * @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in + * which case the data is available in cache. The listener is completed with {@code false} in every other cases: if + * the range to write is already available in cache, if the range is pending fetching via another thread or if + * there is not enough free pages to fetch the range. + */ + public void fetchRange( + final KeyType cacheKey, + final int region, + final ByteRange range, + final long blobLength, + final RangeMissingHandler writer, + final Executor fetchExecutor, + final boolean force, + final ActionListener listener + ) { + if (force == false && freeRegions.isEmpty() && maybeEvictLeastUsed() == false) { // no free page available and no old enough unused region to be evicted logger.info("No free regions, skipping loading region [{}]", region); listener.onResponse(false); @@ -723,8 +728,6 @@ private static void throwAlreadyClosed(String message) { /** * NOTE: Method is package private mostly to allow checking the number of fee regions in tests. - * However, it is also used by {@link SharedBlobCacheService#maybeFetchFullEntry} but we should try - * to move away from that because calling "size" on a ConcurrentLinkedQueue is not a constant time operation. */ int freeRegionCount() { return freeRegions.size(); diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 04658606ce132..1b3335d47b1f0 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -565,139 +565,6 @@ public void testGetMultiThreaded() throws IOException { } } - public void testFetchFullCacheEntry() throws Exception { - Settings settings = Settings.builder() - .put(NODE_NAME_SETTING.getKey(), "node") - .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) - .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) - .put("path.home", createTempDir()) - .build(); - - final var bulkTaskCount = new AtomicInteger(0); - final var threadPool = new TestThreadPool("test"); - final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) { - @Override - public void execute(Runnable command) { - super.execute(command); - bulkTaskCount.incrementAndGet(); - } - }; - - try ( - NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); - var cacheService = new SharedBlobCacheService<>( - environment, - settings, - threadPool, - threadPool.executor(ThreadPool.Names.GENERIC), - BlobCacheMetrics.NOOP - ) - ) { - { - final var cacheKey = generateCacheKey(); - assertEquals(5, cacheService.freeRegionCount()); - final long size = size(250); - AtomicLong bytesRead = new AtomicLong(size); - final PlainActionFuture future = new PlainActionFuture<>(); - cacheService.maybeFetchFullEntry( - cacheKey, - size, - (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( - completionListener, - () -> { - assert streamFactory == null : streamFactory; - bytesRead.addAndGet(-length); - progressUpdater.accept(length); - } - ), - bulkExecutor, - future - ); - - future.get(10, TimeUnit.SECONDS); - assertEquals(0L, bytesRead.get()); - assertEquals(2, cacheService.freeRegionCount()); - assertEquals(3, bulkTaskCount.get()); - } - { - // a download that would use up all regions should not run - final var cacheKey = generateCacheKey(); - assertEquals(2, cacheService.freeRegionCount()); - var configured = cacheService.maybeFetchFullEntry( - cacheKey, - size(500), - (ch, chPos, streamFactory, relPos, len, update, completionListener) -> completeWith(completionListener, () -> { - throw new AssertionError("Should never reach here"); - }), - bulkExecutor, - ActionListener.noop() - ); - assertFalse(configured); - assertEquals(2, cacheService.freeRegionCount()); - } - } - - threadPool.shutdown(); - } - - public void testFetchFullCacheEntryConcurrently() throws Exception { - Settings settings = Settings.builder() - .put(NODE_NAME_SETTING.getKey(), "node") - .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep()) - .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep()) - .put("path.home", createTempDir()) - .build(); - - final var threadPool = new TestThreadPool("test"); - final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()); - - try ( - NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); - var cacheService = new SharedBlobCacheService<>( - environment, - settings, - threadPool, - threadPool.executor(ThreadPool.Names.GENERIC), - BlobCacheMetrics.NOOP - ) - ) { - - final long size = size(randomIntBetween(1, 100)); - final Thread[] threads = new Thread[10]; - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(() -> { - for (int j = 0; j < 1000; j++) { - final var cacheKey = generateCacheKey(); - safeAwait( - (ActionListener listener) -> cacheService.maybeFetchFullEntry( - cacheKey, - size, - ( - channel, - channelPos, - streamFactory, - relativePos, - length, - progressUpdater, - completionListener) -> completeWith(completionListener, () -> progressUpdater.accept(length)), - bulkExecutor, - listener - ) - ); - } - }); - } - for (Thread thread : threads) { - thread.start(); - } - for (Thread thread : threads) { - thread.join(); - } - } finally { - assertTrue(ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS)); - } - } - public void testCacheSizeRejectedOnNonFrozenNodes() { String cacheSize = randomBoolean() ? ByteSizeValue.ofBytes(size(500)).getStringRep() @@ -1130,6 +997,195 @@ public void execute(Runnable command) { threadPool.shutdown(); } + public void testFetchRegion() throws Exception { + final long cacheSize = size(500L); + final long regionSize = size(100L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + final var bulkTaskCount = new AtomicInteger(0); + final var threadPool = new TestThreadPool("test"); + final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) { + @Override + public void execute(Runnable command) { + super.execute(command); + bulkTaskCount.incrementAndGet(); + } + }; + + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + threadPool.executor(ThreadPool.Names.GENERIC), + BlobCacheMetrics.NOOP + ) + ) { + { + // fetch a single region + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + final long blobLength = size(250); // 3 regions + AtomicLong bytesRead = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRegion( + cacheKey, + 0, + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + assert streamFactory == null : streamFactory; + bytesRead.addAndGet(length); + progressUpdater.accept(length); + } + ), + bulkExecutor, + true, + future + ); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(regionSize, bytesRead.get()); + assertEquals(4, cacheService.freeRegionCount()); + assertEquals(1, bulkTaskCount.get()); + } + { + // fetch multiple regions to used all the cache + final int remainingFreeRegions = cacheService.freeRegionCount(); + assertEquals(4, cacheService.freeRegionCount()); + + final var cacheKey = generateCacheKey(); + final long blobLength = regionSize * remainingFreeRegions; + AtomicLong bytesRead = new AtomicLong(0L); + + final PlainActionFuture> future = new PlainActionFuture<>(); + final var listener = new GroupedActionListener<>(remainingFreeRegions, future); + for (int region = 0; region < remainingFreeRegions; region++) { + cacheService.fetchRegion( + cacheKey, + region, + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + assert streamFactory == null : streamFactory; + bytesRead.addAndGet(length); + progressUpdater.accept(length); + } + ), + bulkExecutor, + true, + listener + ); + } + + var results = future.get(10, TimeUnit.SECONDS); + assertThat(results.stream().allMatch(result -> result), is(true)); + assertEquals(blobLength, bytesRead.get()); + assertEquals(0, cacheService.freeRegionCount()); + assertEquals(1 + remainingFreeRegions, bulkTaskCount.get()); + } + { + // cache fully used, no entry old enough to be evicted and force=false should not evict entries + assertEquals(0, cacheService.freeRegionCount()); + final var cacheKey = generateCacheKey(); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRegion( + cacheKey, + 0, + regionSize, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + throw new AssertionError("should not be executed"); + } + ), + bulkExecutor, + false, + future + ); + assertThat("Listener is immediately completed", future.isDone(), is(true)); + assertThat("Region already exists in cache", future.get(), is(false)); + } + { + // cache fully used, but force=true, so the cache should evict regions to make space for the requested regions + assertEquals(0, cacheService.freeRegionCount()); + AtomicLong bytesRead = new AtomicLong(0L); + final var cacheKey = generateCacheKey(); + final PlainActionFuture> future = new PlainActionFuture<>(); + var regionsToFetch = randomIntBetween(1, (int) (cacheSize / regionSize)); + final var listener = new GroupedActionListener<>(regionsToFetch, future); + long blobLength = regionsToFetch * regionSize; + for (int region = 0; region < regionsToFetch; region++) { + cacheService.fetchRegion( + cacheKey, + region, + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + assert streamFactory == null : streamFactory; + bytesRead.addAndGet(length); + progressUpdater.accept(length); + } + ), + bulkExecutor, + true, + listener + ); + } + + var results = future.get(10, TimeUnit.SECONDS); + assertThat(results.stream().allMatch(result -> result), is(true)); + assertEquals(blobLength, bytesRead.get()); + assertEquals(0, cacheService.freeRegionCount()); + assertEquals(regionsToFetch + 5, bulkTaskCount.get()); + } + { + cacheService.computeDecay(); + + // We explicitly called computeDecay, meaning that some regions must have been demoted to level 0, + // therefore there should be enough room to fetch the requested range regardless of the force flag. + final var cacheKey = generateCacheKey(); + assertEquals(0, cacheService.freeRegionCount()); + long blobLength = randomLongBetween(1L, regionSize); + AtomicLong bytesRead = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRegion( + cacheKey, + 0, + blobLength, + (channel, channelPos, ignore, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + assert ignore == null : ignore; + bytesRead.addAndGet(length); + progressUpdater.accept(length); + } + ), + bulkExecutor, + randomBoolean(), + future + ); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(blobLength, bytesRead.get()); + assertEquals(0, cacheService.freeRegionCount()); + } + } finally { + TestThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + public void testMaybeFetchRange() throws Exception { final long cacheSize = size(500L); final long regionSize = size(100L); @@ -1301,6 +1357,208 @@ public void execute(Runnable command) { threadPool.shutdown(); } + public void testFetchRange() throws Exception { + final long cacheSize = size(500L); + final long regionSize = size(100L); + Settings settings = Settings.builder() + .put(NODE_NAME_SETTING.getKey(), "node") + .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(cacheSize).getStringRep()) + .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(regionSize).getStringRep()) + .put("path.home", createTempDir()) + .build(); + + final var bulkTaskCount = new AtomicInteger(0); + final var threadPool = new TestThreadPool("test"); + final var bulkExecutor = new StoppableExecutorServiceWrapper(threadPool.generic()) { + @Override + public void execute(Runnable command) { + super.execute(command); + bulkTaskCount.incrementAndGet(); + } + }; + + try ( + NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + var cacheService = new SharedBlobCacheService<>( + environment, + settings, + threadPool, + threadPool.executor(ThreadPool.Names.GENERIC), + BlobCacheMetrics.NOOP + ) + ) { + { + // fetch a random range in a random region of the blob + final var cacheKey = generateCacheKey(); + assertEquals(5, cacheService.freeRegionCount()); + + // blobLength is 1024000 bytes and requires 3 regions + final long blobLength = size(250); + final var regions = List.of( + // region 0: 0-409600 + ByteRange.of(cacheService.getRegionStart(0), cacheService.getRegionEnd(0)), + // region 1: 409600-819200 + ByteRange.of(cacheService.getRegionStart(1), cacheService.getRegionEnd(1)), + // region 2: 819200-1228800 + ByteRange.of(cacheService.getRegionStart(2), cacheService.getRegionEnd(2)) + ); + + long pos = randomLongBetween(0, blobLength - 1L); + long len = randomLongBetween(1, blobLength - pos); + var range = ByteRange.of(pos, pos + len); + var region = between(0, regions.size() - 1); + var regionRange = cacheService.mapSubRangeToRegion(range, region); + + var bytesCopied = new AtomicLong(0L); + var future = new PlainActionFuture(); + cacheService.maybeFetchRange( + cacheKey, + region, + range, + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + assertThat(range.start() + relativePos, equalTo(cacheService.getRegionStart(region) + regionRange.start())); + assertThat(channelPos, equalTo(Math.toIntExact(regionRange.start()))); + assertThat(length, equalTo(Math.toIntExact(regionRange.length()))); + bytesCopied.addAndGet(length); + } + ), + bulkExecutor, + future + ); + var fetched = future.get(10, TimeUnit.SECONDS); + + assertThat(regionRange.length(), equalTo(bytesCopied.get())); + if (regionRange.isEmpty()) { + assertThat(fetched, is(false)); + assertEquals(5, cacheService.freeRegionCount()); + assertEquals(0, bulkTaskCount.get()); + } else { + assertThat(fetched, is(true)); + assertEquals(4, cacheService.freeRegionCount()); + assertEquals(1, bulkTaskCount.get()); + } + } + { + // fetch multiple ranges to use all the cache + final int remainingFreeRegions = cacheService.freeRegionCount(); + assertThat(remainingFreeRegions, greaterThanOrEqualTo(4)); + bulkTaskCount.set(0); + + final var cacheKey = generateCacheKey(); + final long blobLength = regionSize * remainingFreeRegions; + AtomicLong bytesCopied = new AtomicLong(0L); + + final PlainActionFuture> future = new PlainActionFuture<>(); + final var listener = new GroupedActionListener<>(remainingFreeRegions, future); + for (int region = 0; region < remainingFreeRegions; region++) { + cacheService.fetchRange( + cacheKey, + region, + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> bytesCopied.addAndGet(length) + ), + bulkExecutor, + true, + listener + ); + } + + var results = future.get(10, TimeUnit.SECONDS); + assertThat(results.stream().allMatch(result -> result), is(true)); + assertEquals(blobLength, bytesCopied.get()); + assertEquals(0, cacheService.freeRegionCount()); + assertEquals(remainingFreeRegions, bulkTaskCount.get()); + } + { + // cache fully used, no entry old enough to be evicted and force=false + assertEquals(0, cacheService.freeRegionCount()); + final var cacheKey = generateCacheKey(); + final var blobLength = randomLongBetween(1L, regionSize); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRange( + cacheKey, + randomIntBetween(0, 10), + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> { + throw new AssertionError("should not be executed"); + } + ), + bulkExecutor, + false, + future + ); + assertThat("Listener is immediately completed", future.isDone(), is(true)); + assertThat("Region already exists in cache", future.get(), is(false)); + } + { + // cache fully used, since force=true the range should be populated + final var cacheKey = generateCacheKey(); + assertEquals(0, cacheService.freeRegionCount()); + long blobLength = randomLongBetween(1L, regionSize); + AtomicLong bytesCopied = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRange( + cacheKey, + 0, + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> bytesCopied.addAndGet(length) + ), + bulkExecutor, + true, + future + ); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(blobLength, bytesCopied.get()); + assertEquals(0, cacheService.freeRegionCount()); + } + { + cacheService.computeDecay(); + + // We explicitly called computeDecay, meaning that some regions must have been demoted to level 0, + // therefore there should be enough room to fetch the requested range regardless of the force flag. + final var cacheKey = generateCacheKey(); + assertEquals(0, cacheService.freeRegionCount()); + long blobLength = randomLongBetween(1L, regionSize); + AtomicLong bytesCopied = new AtomicLong(0L); + final PlainActionFuture future = new PlainActionFuture<>(); + cacheService.fetchRange( + cacheKey, + 0, + ByteRange.of(0L, blobLength), + blobLength, + (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith( + completionListener, + () -> bytesCopied.addAndGet(length) + ), + bulkExecutor, + randomBoolean(), + future + ); + + var fetched = future.get(10, TimeUnit.SECONDS); + assertThat("Region has been fetched", fetched, is(true)); + assertEquals(blobLength, bytesCopied.get()); + assertEquals(0, cacheService.freeRegionCount()); + } + } finally { + TestThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + public void testPopulate() throws Exception { final long regionSize = size(1L); Settings settings = Settings.builder()