Skip to content

Commit 602821e

Browse files
committed
Only create the writer for the cache when needed
Creating the writer to the cache of newly downloaded data can be quite involved (requires quite a few objects to be created underneath). However, this writer is only really used if we have things to download from the object store. This proposes we delay creating the writer until it's actually needed, such that the fast path of `maybeFetch...`ing something avoids creating the entire machinery of `SequentialRangeMissingHandler` and the needed `BlobCacheReader`.
1 parent 9e0d885 commit 602821e

File tree

2 files changed

+60
-57
lines changed

2 files changed

+60
-57
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.util.function.IntConsumer;
6565
import java.util.function.LongSupplier;
6666
import java.util.function.Predicate;
67+
import java.util.function.Supplier;
6768
import java.util.stream.Collectors;
6869

6970
/**
@@ -553,21 +554,21 @@ public boolean maybeFetchFullEntry(
553554
* If an exception is thrown from the writer then the cache entry being downloaded is freed
554555
* and unlinked
555556
*
556-
* @param cacheKey the key to fetch data for
557-
* @param region the region of the blob to fetch
558-
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
559-
* @param writer a writer that handles writing of newly downloaded data to the shared cache
560-
* @param fetchExecutor an executor to use for reading from the blob store
561-
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
562-
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
563-
* the region to write is already available in cache, if the region is pending fetching via another thread or if
564-
* there is not enough free pages to fetch the region.
557+
* @param cacheKey the key to fetch data for
558+
* @param region the region of the blob to fetch
559+
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
560+
* @param writerSupplier a supplier for the writer that handles writing of newly downloaded data to the shared cache
561+
* @param fetchExecutor an executor to use for reading from the blob store
562+
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the region, in
563+
* which case the data is available in cache. The listener is completed with {@code false} in every other cases:
564+
* if the region to write is already available in cache, if the region is pending fetching via another thread or
565+
* if there is not enough free pages to fetch the region.
565566
*/
566567
public void maybeFetchRegion(
567568
final KeyType cacheKey,
568569
final int region,
569570
final long blobLength,
570-
final RangeMissingHandler writer,
571+
final Supplier<RangeMissingHandler> writerSupplier,
571572
final Executor fetchExecutor,
572573
final ActionListener<Boolean> listener
573574
) {
@@ -584,7 +585,7 @@ public void maybeFetchRegion(
584585
return;
585586
}
586587
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, region);
587-
entry.populate(regionRange, writer, fetchExecutor, listener);
588+
entry.populate(regionRange, writerSupplier, fetchExecutor, listener);
588589
} catch (Exception e) {
589590
listener.onFailure(e);
590591
}
@@ -599,23 +600,23 @@ public void maybeFetchRegion(
599600
* If an exception is thrown from the writer then the cache entry being downloaded is freed
600601
* and unlinked
601602
*
602-
* @param cacheKey the key to fetch data for
603-
* @param region the region of the blob
604-
* @param range the range of the blob to fetch
605-
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
606-
* @param writer a writer that handles writing of newly downloaded data to the shared cache
607-
* @param fetchExecutor an executor to use for reading from the blob store
608-
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
609-
* which case the data is available in cache. The listener is completed with {@code false} in every other cases: if
610-
* the range to write is already available in cache, if the range is pending fetching via another thread or if
611-
* there is not enough free pages to fetch the range.
603+
* @param cacheKey the key to fetch data for
604+
* @param region the region of the blob
605+
* @param range the range of the blob to fetch
606+
* @param blobLength the length of the blob from which the region is fetched (used to compute the size of the ending region)
607+
* @param writerSupplier a supplier for the writer that handles writing of newly downloaded data to the shared cache
608+
* @param fetchExecutor an executor to use for reading from the blob store
609+
* @param listener a listener that is completed with {@code true} if the current thread triggered the fetching of the range, in
610+
* which case the data is available in cache. The listener is completed with {@code false} in every other cases:
611+
* if the range to write is already available in cache, if the range is pending fetching via another thread or if
612+
* there is not enough free pages to fetch the range.
612613
*/
613614
public void maybeFetchRange(
614615
final KeyType cacheKey,
615616
final int region,
616617
final ByteRange range,
617618
final long blobLength,
618-
final RangeMissingHandler writer,
619+
final Supplier<RangeMissingHandler> writerSupplier,
619620
final Executor fetchExecutor,
620621
final ActionListener<Boolean> listener
621622
) {
@@ -634,7 +635,7 @@ public void maybeFetchRange(
634635
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, region);
635636
entry.populate(
636637
regionRange,
637-
writerWithOffset(writer, Math.toIntExact(range.start() - getRegionStart(region))),
638+
writerWithOffset(writerSupplier, Math.toIntExact(range.start() - getRegionStart(region))),
638639
fetchExecutor,
639640
listener
640641
);
@@ -643,20 +644,13 @@ public void maybeFetchRange(
643644
}
644645
}
645646

646-
private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int writeOffset) {
647+
private Supplier<RangeMissingHandler> writerWithOffset(Supplier<RangeMissingHandler> writerSupplier, int writeOffset) {
647648
if (writeOffset == 0) {
648649
// no need to allocate a new capturing lambda if the offset isn't adjusted
649-
return writer;
650-
}
651-
return (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writer.fillCacheRange(
652-
channel,
653-
channelPos,
654-
streamFactory,
655-
relativePos - writeOffset,
656-
len,
657-
progressUpdater,
658-
completionListener
659-
);
650+
return writerSupplier;
651+
}
652+
return () -> (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> writerSupplier.get()
653+
.fillCacheRange(channel, channelPos, streamFactory, relativePos - writeOffset, len, progressUpdater, completionListener);
660654
}
661655

662656
// used by tests
@@ -940,7 +934,7 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException {
940934
* Populates a range in cache if the range is not available nor pending to be available in cache.
941935
*
942936
* @param rangeToWrite the range of bytes to populate
943-
* @param writer a writer that handles writing of newly downloaded data to the shared cache
937+
* @param writerSupplier a supplier for the writer that handles writing of newly downloaded data to the shared cache
944938
* @param executor the executor used to download and to write new dat
945939
* @param listener a listener that is completed with {@code true} if the current thread triggered the download and write of the
946940
* range, in which case the listener is completed once writing is done. The listener is completed with {@code false}
@@ -949,7 +943,7 @@ boolean tryRead(ByteBuffer buf, long offset) throws IOException {
949943
*/
950944
void populate(
951945
final ByteRange rangeToWrite,
952-
final RangeMissingHandler writer,
946+
final Supplier<RangeMissingHandler> writerSupplier,
953947
final Executor executor,
954948
final ActionListener<Boolean> listener
955949
) {
@@ -967,6 +961,7 @@ void populate(
967961
listener.onResponse(false);
968962
return;
969963
}
964+
RangeMissingHandler writer = writerSupplier.get();
970965
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps);
971966
logger.trace(
972967
() -> Strings.format(

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ public void testMaybeEvictLeastUsed() throws Exception {
885885
var entry = cacheService.get(cacheKey, regionSize, 0);
886886
entry.populate(
887887
ByteRange.of(0L, regionSize),
888-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
888+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
889889
completionListener,
890890
() -> progressUpdater.accept(length)
891891
),
@@ -983,7 +983,7 @@ public void execute(Runnable command) {
983983
cacheKey,
984984
0,
985985
blobLength,
986-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
986+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
987987
completionListener,
988988
() -> {
989989
assert streamFactory == null : streamFactory;
@@ -1017,14 +1017,18 @@ public void execute(Runnable command) {
10171017
cacheKey,
10181018
region,
10191019
blobLength,
1020-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1021-
completionListener,
1022-
() -> {
1020+
() -> (
1021+
channel,
1022+
channelPos,
1023+
streamFactory,
1024+
relativePos,
1025+
length,
1026+
progressUpdater,
1027+
completionListener) -> completeWith(completionListener, () -> {
10231028
assert streamFactory == null : streamFactory;
10241029
bytesRead.addAndGet(length);
10251030
progressUpdater.accept(length);
1026-
}
1027-
),
1031+
}),
10281032
bulkExecutor,
10291033
listener
10301034
);
@@ -1045,7 +1049,7 @@ public void execute(Runnable command) {
10451049
cacheKey,
10461050
randomIntBetween(0, 10),
10471051
randomLongBetween(1L, regionSize),
1048-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1052+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
10491053
completionListener,
10501054
() -> {
10511055
throw new AssertionError("should not be executed");
@@ -1070,7 +1074,7 @@ public void execute(Runnable command) {
10701074
cacheKey,
10711075
0,
10721076
blobLength,
1073-
(channel, channelPos, ignore, relativePos, length, progressUpdater, completionListener) -> completeWith(
1077+
() -> (channel, channelPos, ignore, relativePos, length, progressUpdater, completionListener) -> completeWith(
10741078
completionListener,
10751079
() -> {
10761080
assert ignore == null : ignore;
@@ -1151,7 +1155,7 @@ public void execute(Runnable command) {
11511155
region,
11521156
range,
11531157
blobLength,
1154-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1158+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
11551159
completionListener,
11561160
() -> {
11571161
assertThat(range.start() + relativePos, equalTo(cacheService.getRegionStart(region) + regionRange.start()));
@@ -1194,10 +1198,14 @@ public void execute(Runnable command) {
11941198
region,
11951199
ByteRange.of(0L, blobLength),
11961200
blobLength,
1197-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1198-
completionListener,
1199-
() -> bytesCopied.addAndGet(length)
1200-
),
1201+
() -> (
1202+
channel,
1203+
channelPos,
1204+
streamFactory,
1205+
relativePos,
1206+
length,
1207+
progressUpdater,
1208+
completionListener) -> completeWith(completionListener, () -> bytesCopied.addAndGet(length)),
12011209
bulkExecutor,
12021210
listener
12031211
);
@@ -1220,7 +1228,7 @@ public void execute(Runnable command) {
12201228
randomIntBetween(0, 10),
12211229
ByteRange.of(0L, blobLength),
12221230
blobLength,
1223-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1231+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
12241232
completionListener,
12251233
() -> {
12261234
throw new AssertionError("should not be executed");
@@ -1246,7 +1254,7 @@ public void execute(Runnable command) {
12461254
0,
12471255
ByteRange.of(0L, blobLength),
12481256
blobLength,
1249-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1257+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
12501258
completionListener,
12511259
() -> bytesCopied.addAndGet(length)
12521260
),
@@ -1292,7 +1300,7 @@ public void testPopulate() throws Exception {
12921300
final PlainActionFuture<Boolean> future1 = new PlainActionFuture<>();
12931301
entry.populate(
12941302
ByteRange.of(0, regionSize - 1),
1295-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1303+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
12961304
completionListener,
12971305
() -> {
12981306
bytesWritten.addAndGet(length);
@@ -1311,7 +1319,7 @@ public void testPopulate() throws Exception {
13111319
final PlainActionFuture<Boolean> future2 = new PlainActionFuture<>();
13121320
entry.populate(
13131321
ByteRange.of(0, regionSize - 1),
1314-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1322+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
13151323
completionListener,
13161324
() -> {
13171325
bytesWritten.addAndGet(length);
@@ -1327,7 +1335,7 @@ public void testPopulate() throws Exception {
13271335
final PlainActionFuture<Boolean> future3 = new PlainActionFuture<>();
13281336
entry.populate(
13291337
ByteRange.of(0, regionSize - 1),
1330-
(channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
1338+
() -> (channel, channelPos, streamFactory, relativePos, length, progressUpdater, completionListener) -> completeWith(
13311339
completionListener,
13321340
() -> {
13331341
bytesWritten.addAndGet(length);
@@ -1531,7 +1539,7 @@ public void fillCacheRange(
15311539
assertThat(safeGet(future).longValue(), equalTo(regionSizeInBytes));
15321540
} else {
15331541
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
1534-
region.populate(range, rangeMissingHandler, threadPool.generic(), future);
1542+
region.populate(range, () -> rangeMissingHandler, threadPool.generic(), future);
15351543
assertThat(safeGet(future), equalTo(true));
15361544
}
15371545
assertThat(invocationCounter.get(), equalTo(numberGaps));

0 commit comments

Comments
 (0)