Skip to content

Commit 83c96d0

Browse files
volodk85dakrone
authored andcommitted
Write multiple cache gaps for warming service in one go (elastic#112071)
* Write multiple cache gaps for warming service in one go
1 parent 6699d1c commit 83c96d0

File tree

2 files changed

+60
-19
lines changed

2 files changed

+60
-19
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -967,12 +967,48 @@ void populate(
967967
listener.onResponse(false);
968968
return;
969969
}
970-
try (var gapsListener = new RefCountingListener(listener.map(unused -> true))) {
971-
assert writer.sharedInputStreamFactory(gaps) == null;
972-
for (SparseFileTracker.Gap gap : gaps) {
973-
executor.execute(
974-
fillGapRunnable(gap, writer, null, ActionListener.releaseAfter(gapsListener.acquire(), refs.acquire()))
975-
);
970+
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps);
971+
logger.trace(
972+
() -> Strings.format(
973+
"fill gaps %s %s shared input stream factory",
974+
gaps,
975+
streamFactory == null ? "without" : "with"
976+
)
977+
);
978+
if (streamFactory == null) {
979+
try (var parallelGapsListener = new RefCountingListener(listener.map(unused -> true))) {
980+
for (SparseFileTracker.Gap gap : gaps) {
981+
executor.execute(
982+
fillGapRunnable(
983+
gap,
984+
writer,
985+
null,
986+
ActionListener.releaseAfter(parallelGapsListener.acquire(), refs.acquire())
987+
)
988+
);
989+
}
990+
}
991+
} else {
992+
try (
993+
var sequentialGapsListener = new RefCountingListener(
994+
ActionListener.runBefore(listener.map(unused -> true), streamFactory::close)
995+
)
996+
) {
997+
final List<Runnable> gapFillingTasks = gaps.stream()
998+
.map(
999+
gap -> fillGapRunnable(
1000+
gap,
1001+
writer,
1002+
streamFactory,
1003+
ActionListener.releaseAfter(sequentialGapsListener.acquire(), refs.acquire())
1004+
)
1005+
)
1006+
.toList();
1007+
executor.execute(() -> {
1008+
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
1009+
// gap will still be executed.
1010+
gapFillingTasks.forEach(Runnable::run);
1011+
});
9761012
}
9771013
}
9781014
}
@@ -1019,8 +1055,7 @@ void populateAndRead(
10191055
() -> Strings.format(
10201056
"fill gaps %s %s shared input stream factory",
10211057
gaps,
1022-
(streamFactory == null ? "without" : "with"),
1023-
(streamFactory == null ? "" : " " + streamFactory)
1058+
streamFactory == null ? "without" : "with"
10241059
)
10251060
);
10261061
if (streamFactory == null) {

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,7 +1422,7 @@ protected int computeCacheFileRegionSize(long fileLength, int region) {
14221422
}
14231423
}
14241424

1425-
public void testSharedSourceInputStreamFactory() throws Exception {
1425+
public void testUsageSharedSourceInputStreamFactoryInCachePopulation() throws Exception {
14261426
final long regionSizeInBytes = size(100);
14271427
final Settings settings = Settings.builder()
14281428
.put(NODE_NAME_SETTING.getKey(), "node")
@@ -1519,16 +1519,22 @@ public void fillCacheRange(
15191519
};
15201520

15211521
final var range = ByteRange.of(0, regionSizeInBytes);
1522-
final PlainActionFuture<Integer> future = new PlainActionFuture<>();
1523-
region.populateAndRead(
1524-
range,
1525-
range,
1526-
(channel, channelPos, relativePos, length) -> length,
1527-
rangeMissingHandler,
1528-
threadPool.generic(),
1529-
future
1530-
);
1531-
safeGet(future);
1522+
if (randomBoolean()) {
1523+
final PlainActionFuture<Integer> future = new PlainActionFuture<>();
1524+
region.populateAndRead(
1525+
range,
1526+
range,
1527+
(channel, channelPos, relativePos, length) -> length,
1528+
rangeMissingHandler,
1529+
threadPool.generic(),
1530+
future
1531+
);
1532+
assertThat(safeGet(future).longValue(), equalTo(regionSizeInBytes));
1533+
} else {
1534+
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
1535+
region.populate(range, rangeMissingHandler, threadPool.generic(), future);
1536+
assertThat(safeGet(future), equalTo(true));
1537+
}
15321538
assertThat(invocationCounter.get(), equalTo(numberGaps));
15331539
assertThat(region.tracker.checkAvailable(regionSizeInBytes), is(true));
15341540
assertBusy(() -> assertThat(factoryClosed.get(), is(true)));

0 commit comments

Comments
 (0)