From ad8bf1c8748db1f2bb72f605b15e392f7661db76 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Tue, 30 Sep 2025 02:36:21 +0200 Subject: [PATCH 1/5] OAK-11934 - segment preloading for PersistentCache --- .../oak/segment/file/AbstractFileStore.java | 53 ++- .../oak/segment/file/FileStore.java | 27 +- .../oak/segment/file/FileStoreBuilder.java | 52 ++- .../oak/segment/file/ReadOnlyFileStore.java | 20 +- .../file/preloader/SegmentPreloader.java | 368 ++++++++++++++++++ .../DelegatingPersistentCache.java | 54 +++ ...ersistentCachePreloadingConfiguration.java | 92 +++++ .../persistentcache/package-info.java | 2 +- .../file/preloader/SegmentPreloaderTest.java | 234 +++++++++++ 9 files changed, 874 insertions(+), 28 deletions(-) create mode 100644 oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java create mode 100644 oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java create mode 100644 oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java create mode 100644 oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 33a23b0e465..27b99454a0b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -31,6 +31,8 @@ import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.conditions.Validate; +import org.apache.jackrabbit.oak.commons.pio.Closer; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.RecordType; import org.apache.jackrabbit.oak.segment.Revisions; @@ -48,6 +50,7 @@ import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader; import org.apache.jackrabbit.oak.segment.file.tar.EntryRecovery; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; @@ -55,6 +58,8 @@ import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.stats.StatsOptions; import org.jetbrains.annotations.NotNull; @@ -89,6 +94,8 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { */ private static final int MAX_STORE_VERSION = 2; + protected final @Nullable PersistentCache persistentCache; + static ManifestChecker newManifestChecker(SegmentNodeStorePersistence persistence, boolean strictVersionCheck) throws IOException { return ManifestChecker.newManifestChecker( persistence.getManifestFile(), @@ -130,7 +137,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th protected final IOMonitor ioMonitor; protected final RemoteStoreMonitor remoteStoreMonitor; - + protected final int binariesInlineThreshold; AbstractFileStore(final FileStoreBuilder builder) { @@ -156,6 +163,15 @@ public SegmentId newSegmentId(long msb, long lsb) { this.remoteStoreMonitor = builder.getRemoteStoreMonitor(); this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider()); this.binariesInlineThreshold = builder.getBinariesInlineThreshold(); + PersistentCache persistentCache = builder.getPersistentCache(); + PersistentCachePreloadingConfiguration preloadingConfig = builder.getPreloadingConfiguration(); + if (preloadingConfig != null) { + Validate.checkState(persistentCache != null, + "PersistentCache must be configured when using a PersistentCachePreloadConfiguration"); + this.persistentCache = SegmentPreloader.decorate(persistentCache, preloadingConfig, this::getTarFiles); + } else { + this.persistentCache = persistentCache; + } } static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) { @@ -165,6 +181,8 @@ static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentI return new SegmentNotFoundException(id, e); } + abstract TarFiles getTarFiles(); + @NotNull public CacheStatsMBean getSegmentCacheStats() { return segmentCache.getCacheStats(); @@ -192,7 +210,7 @@ public SegmentReader getReader() { public SegmentIdProvider getSegmentIdProvider() { return tracker; } - + public int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -281,6 +299,25 @@ public void consume(int number, RecordType type, int offset) { return binaryReferences; } + @Override + public void close() { + doClose(); + System.gc(); // for any memory-mappings that are no longer used + log.info("TarMK closed: {}", directory); + } + + protected void doClose() { + Closer closer = Closer.create(); + registerCloseables(closer); + closeAndLogOnFail(closer); + } + + protected void registerCloseables(Closer closer) { + if (persistentCache instanceof Closeable) { + closer.register((Closeable) persistentCache); + } + } + static void closeAndLogOnFail(Closeable closeable) { if (closeable != null) { try { @@ -292,11 +329,19 @@ static void closeAndLogOnFail(Closeable closeable) { } } - Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) { - Buffer buffer = tarFiles.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + Segment readSegmentUncached(SegmentId id) { + Buffer buffer; + if (persistentCache != null) { + buffer = persistentCache.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(), + () -> getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits())); + } else { + buffer = getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + } + if (buffer == null) { throw new SegmentNotFoundException(id); } + segmentBufferMonitor.trackAllocation(buffer); return new Segment(tracker, id, buffer); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index e933b5611e8..01776478b6b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -249,6 +249,11 @@ FileStore bind(TarRevisions revisions) throws IOException { } } + @Override + TarFiles getTarFiles() { + return tarFiles; + } + @NotNull private Supplier initialNode() { return new Supplier() { @@ -483,21 +488,23 @@ public void close() { log.warn("Unable to flush the store", e); } - Closer closer = Closer.create(); - closer.register(repositoryLock::unlock); - closer.register(tarFiles) ; - closer.register(revisions); - - closeAndLogOnFail(closer); + doClose(); } // Try removing pending files in case the scheduler didn't have a chance to run yet System.gc(); // for any memory-mappings that are no longer used fileReaper.reap(); - log.info("TarMK closed: {}", directory); } + @Override + protected void registerCloseables(Closer closer) { + closer.register(repositoryLock::unlock); + closer.register(tarFiles) ; + closer.register(revisions); + super.registerCloseables(closer); + } + @Override public boolean containsSegment(SegmentId id) { try (ShutDownCloser ignored = shutDown.keepAlive()) { @@ -509,7 +516,7 @@ public boolean containsSegment(SegmentId id) { @NotNull public Segment readSegment(final SegmentId id) { try (ShutDownCloser ignored = shutDown.keepAlive()) { - return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id)); + return segmentCache.getSegment(id, () -> readSegmentUncached(id)); } catch (ExecutionException | UncheckedExecutionException e) { if (e.getCause() instanceof RepositoryNotReachableException) { RepositoryNotReachableException re = (RepositoryNotReachableException) e.getCause(); @@ -573,6 +580,10 @@ public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) th if (!eagerSegmentCaching && segment != null) { segmentCache.putSegment(segment); } + + if (persistentCache != null) { + persistentCache.writeSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(), Buffer.wrap(buffer, offset, length)); + } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index fd390022321..04e68ae6f55 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -48,6 +48,8 @@ import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.monitor.*; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceLogWriter; import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceMonitor; import org.apache.jackrabbit.oak.spi.blob.BlobStore; @@ -94,7 +96,7 @@ public class FileStoreBuilder { private boolean memoryMapping = MEMORY_MAPPING_DEFAULT; private boolean offHeapAccess = getBoolean("access.off.heap"); - + private int binariesInlineThreshold = Segment.MEDIUM_LIMIT; private SegmentNodeStorePersistence persistence; @@ -108,6 +110,12 @@ public class FileStoreBuilder { @Nullable private EvictingWriteCacheManager cacheManager; + @Nullable + private PersistentCache persistentCache; + + @Nullable + private PersistentCachePreloadingConfiguration preloadingConfig; + private class FileStoreGCListener extends DelegatingGCMonitor implements GCListener { @Override public void compactionSucceeded(@NotNull GCGeneration newGeneration) { @@ -256,6 +264,34 @@ public FileStoreBuilder withNodeDeduplicationCacheSize(int nodeDeduplicationCach return this; } + /** + * The {@link PersistentCache} instance to be used, if any. By default no {@code PersistentCache} + * is configured. + * + * @param persistentCache the persistent cache + * @return this instance + */ + @NotNull + public FileStoreBuilder withPersistentCache(@NotNull PersistentCache persistentCache) { + this.persistentCache = requireNonNull(persistentCache); + return this; + } + + /** + * The {@link PersistentCachePreloadingConfiguration} to be used for preloading segments. + * This configuration must be used together with a {@link PersistentCache}, configured + * via {@link #withPersistentCache(PersistentCache)}. By default, no segment preloading + * is configured. + * + * @param preloadingConfig the configuration for persistent cache preloading + * @return this instance + */ + @NotNull + public FileStoreBuilder withPersistentCachePreloading(@NotNull PersistentCachePreloadingConfiguration preloadingConfig) { + this.preloadingConfig = requireNonNull(preloadingConfig); + return this; + } + /** * Turn memory mapping on or off * @@ -397,7 +433,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) { this.eagerSegmentCaching = eagerSegmentCaching; return this; } - + /** * Sets the threshold under which binaries are inlined in data segments. * @param binariesInlineThreshold the threshold @@ -585,11 +621,19 @@ boolean getStrictVersionCheck() { boolean getEagerSegmentCaching() { return eagerSegmentCaching; } - + int getBinariesInlineThreshold() { return binariesInlineThreshold; } + @Nullable PersistentCache getPersistentCache() { + return persistentCache; + } + + @Nullable PersistentCachePreloadingConfiguration getPreloadingConfiguration() { + return preloadingConfig; + } + @Override public String toString() { return "FileStoreBuilder{" + @@ -607,6 +651,8 @@ public String toString() { ", memoryMapping=" + memoryMapping + ", offHeapAccess=" + offHeapAccess + ", gcOptions=" + gcOptions + + ", persistentCache=" + persistentCache + + ", persistentCachePreloadingConfiguration=" + preloadingConfig + '}'; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index 5037f172ffe..a74e1a4f0d6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -94,6 +93,11 @@ ReadOnlyFileStore bind(@NotNull ReadOnlyRevisions revisions) throws IOException return this; } + @Override + TarFiles getTarFiles() { + return tarFiles; + } + /** * Go to the specified {@code revision} * @@ -120,25 +124,17 @@ public boolean containsSegment(SegmentId id) { @NotNull public Segment readSegment(final SegmentId id) { try { - return segmentCache.getSegment(id, new Callable() { - @Override - public Segment call() throws Exception { - return readSegmentUncached(tarFiles, id); - } - }); + return segmentCache.getSegment(id, () -> readSegmentUncached(id)); } catch (ExecutionException | UncheckedExecutionException e) { throw asSegmentNotFoundException(e, id); } } @Override - public void close() { - Closer closer = Closer.create(); + protected void registerCloseables(Closer closer) { closer.register(tarFiles); closer.register(revisions); - closeAndLogOnFail(closer); - System.gc(); // for any memory-mappings that are no longer used - log.info("TarMK closed: {}", directory); + super.registerCloseables(closer); } @NotNull diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java new file mode 100644 index 00000000000..efa36ae89a3 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.preloader; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.internal.function.Suppliers; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument; + +/** + * A {@link PersistentCache} decorator that preloads segments into the cache by + * asynchronously prefetching segments referenced by a segment that is being read + * from the cache. + * + * @see PersistentCachePreloadingConfiguration + */ +public class SegmentPreloader extends DelegatingPersistentCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(SegmentPreloader.class); + + private final Map inProgressPrefetch; + + private final ConcurrentHashMap>> graphCache; + + private final Set cachedSegments; + + private final PersistentCache delegate; + + private final ExecutorService dispatchPool; + + private final ExecutorService prefetchPool; + + private final int prefetchDepth; + + private final Supplier tarFiles; + + /** + * Factory method that decorates the given {@link PersistentCache} with a + * {@link SegmentPreloader} if the given configuration requires preloading. + * Otherwise, the given {@code delegate} is returned as-is. + * + * @param delegate the cache to decorate + * @param config the preloading configuration + * @param tarFiles a supplier of the {@link TarFiles} instance used to read segments and segment graphs + * @return the decorated cache or the given {@code delegate} if no preloading is configured + */ + public static @NotNull PersistentCache decorate(@NotNull PersistentCache delegate, @NotNull PersistentCachePreloadingConfiguration config, @NotNull Supplier tarFiles) { + if (config.getConcurrency() > 0 && config.getPrefetchDepth() > 0) { + return new SegmentPreloader(delegate, config, tarFiles); + } + return delegate; + } + + private SegmentPreloader(@NotNull PersistentCache delegate, @NotNull PersistentCachePreloadingConfiguration config, @NotNull Supplier tarFiles) { + this.delegate = delegate; + this.tarFiles = Suppliers.memoize(tarFiles); + this.inProgressPrefetch = new ConcurrentHashMap<>(); + this.graphCache = new ConcurrentHashMap<>(); + this.cachedSegments = ConcurrentHashMap.newKeySet(); + this.prefetchDepth = config.getPrefetchDepth(); + this.dispatchPool = new ThreadPoolExecutor(1,1, + 1, TimeUnit.SECONDS, + new PriorityBlockingQueue<>(), + r -> new Thread(r, "segment-prefetch-dispatcher")) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + clearInProgressTask(r); + } + }; + int prefetchThreads = config.getConcurrency(); + this.prefetchPool = new ThreadPoolExecutor(Math.max(1, prefetchThreads / 4), prefetchThreads, + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(prefetchThreads * 4), + r -> { + String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); + Thread thread = new Thread(r, threadName); + thread.setUncaughtExceptionHandler((t, e) -> { + if (!(e instanceof InterruptedException)) { + LOG.warn("Uncaught exception in thread {}", t.getName(), e); + } + }); + return thread; + }, + (r, executor) -> { + try { + // force the caller thread to wait for space in the queue (this is always a thread in the dispatchPool) + // this creates back-pressure to the dispatchPool, slowing down the dispatching of new prefetch tasks + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) { + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + clearInProgressTask(r); + } + }; + } + + @Override + protected PersistentCache delegate() { + return delegate; + } + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + dispatch(tarFiles.get(), msb, lsb); + return delegate().readSegment(msb, lsb, loader); + } + + private void dispatch(@NotNull TarFiles tarFiles, long msb, long lsb) { + dispatch(tarFiles, tarFiles::getIndices, msb, lsb, 0); + } + + private void dispatch(@NotNull TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + execute(dispatchPool, new PrefetchDispatchTask(tarFiles, indicesSupplier, msb, lsb, depth)); + } + + private void prefetch(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + execute(prefetchPool, new PrefetchTask(tarFiles, indicesSupplier, msb, lsb, depth)); + } + + private void execute(ExecutorService pool, Runnable r) { + if (registerInProgressTask(r)) { + pool.execute(r); + } + } + + private boolean registerInProgressTask(Runnable r) { + return inProgressPrefetch.putIfAbsent(r.hashCode(), Thread.currentThread().getName()) == null; + } + + private void clearInProgressTask(Runnable r) { + inProgressPrefetch.remove(r.hashCode()); + } + + @VisibleForTesting + boolean hasInProgressTasks() { + return !inProgressPrefetch.isEmpty(); + } + + @Override + public void close() { + try { + prefetchPool.shutdown(); + dispatchPool.shutdown(); + if (!prefetchPool.awaitTermination(4, TimeUnit.SECONDS)) { + prefetchPool.shutdownNow(); + } + if (!dispatchPool.awaitTermination(1, TimeUnit.SECONDS)) { + dispatchPool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + prefetchPool.shutdownNow(); + dispatchPool.shutdownNow(); + } + } + + private class PrefetchDispatchTask implements Runnable, Comparable { + + private final TarFiles tarFiles; + + private final Supplier>> indicesSupplier; + + private final long msb; + + private final long lsb; + + private final int depth; + + private final long creationTime = System.nanoTime(); + + PrefetchDispatchTask(@NotNull TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + checkArgument(depth < prefetchDepth, "depth must be < %d, is %d", prefetchDepth, depth); + this.tarFiles = tarFiles; + this.indicesSupplier = indicesSupplier; + this.msb = msb; + this.lsb = lsb; + this.depth = depth + 1; + LOG.debug("Created: {}", this); + } + + @Override + public void run() { + LOG.debug("Running: {}", this); + UUID uuid = new UUID(msb, lsb); + Map> indices = indicesSupplier.get(); + String archiveName = indices.entrySet().stream() + .filter(entry -> entry.getValue().contains(uuid)) + .findFirst() + .map(Map.Entry::getKey) + .orElse(null); + + Map> graph = graphCache.computeIfAbsent(archiveName, name -> { + try { + return tarFiles.getGraph(name); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + for (UUID reference : graph.get(uuid)) { + long refMsb = reference.getMostSignificantBits(); + long refLsb = reference.getLeastSignificantBits(); + if (!cachedSegments.contains(reference) && !delegate.containsSegment(refMsb, refLsb)) { + prefetch(tarFiles, () -> indices, refMsb, refLsb, depth); + } else if (depth < prefetchDepth - 1 && SegmentId.isDataSegmentId(refLsb)) { + dispatch(tarFiles, () -> indices, refMsb, refLsb, depth); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o.getClass() == PrefetchDispatchTask.class)) { + return false; + } + PrefetchDispatchTask that = (PrefetchDispatchTask) o; + return msb == that.msb && lsb == that.lsb && depth == that.depth; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), msb, lsb, depth); + } + + @Override + public String toString() { + return "PrefetchDispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + } + + private int getPrefetchDepth() { + return depth; + } + + private long getCreationTime() { + return creationTime; + } + + @Override + public int compareTo(@NotNull SegmentPreloader.PrefetchDispatchTask o) { + return Comparator + .comparing(PrefetchDispatchTask::getPrefetchDepth) + .thenComparing(PrefetchDispatchTask::getCreationTime) + .compare(this, o); + } + } + + private class PrefetchTask implements Runnable { + + private final TarFiles tarFiles; + + private final Supplier>> indicesSupplier; + + private final long msb; + + private final long lsb; + + private final int depth; + + PrefetchTask(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + checkArgument(depth <= prefetchDepth, "depth must be <= %d, is %d", prefetchDepth, depth); + this.tarFiles = tarFiles; + this.indicesSupplier = indicesSupplier; + this.msb = msb; + this.lsb = lsb; + this.depth = depth; + LOG.debug("Created: {}", this); + } + + @Override + public void run() { + LOG.debug("Running: {}", this); + try { + if (depth < prefetchDepth && SegmentId.isDataSegmentId(lsb)) { + dispatch(tarFiles, indicesSupplier, msb, lsb, depth); + } + UUID uuid = new UUID(msb, lsb); + if (!cachedSegments.contains(uuid) && !delegate.containsSegment(msb, lsb)) { + Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); + if (segmentBuffer != null) { + cachedSegments.add(uuid); + delegate.writeSegment(msb, lsb, segmentBuffer); + } + } + } catch (SegmentNotFoundException e) { + LOG.warn("SegmentNotFoundException during prefetch of segment {}", new UUID(msb, lsb), e); + throw e; + } catch (Exception e) { + LOG.warn("Exception during prefetch of segment {}", new UUID(msb, lsb), e); + throw new RuntimeException(e); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o.getClass() == PrefetchTask.class)) { + return false; + } + PrefetchTask that = (PrefetchTask) o; + return msb == that.msb && lsb == that.lsb; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), msb, lsb); + } + + @Override + public String toString() { + return "PrefetchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java new file mode 100644 index 00000000000..4895e4baa9b --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.Callable; + +/** + * Simple abstract implementation of a delegating PersistentCache that can be + * used as a base class for {@link PersistentCache} decorators. + */ +public abstract class DelegatingPersistentCache implements PersistentCache { + + protected abstract PersistentCache delegate(); + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + return delegate().readSegment(msb, lsb, loader); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return delegate().containsSegment(msb, lsb); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + delegate().writeSegment(msb, lsb, buffer); + } + + @Override + public void cleanUp() { + delegate().cleanUp(); + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java new file mode 100644 index 00000000000..02d576ffe30 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import java.util.concurrent.Callable; + +/** + * Configuration for a segment prefetch mechanism that preloads segments into a + * {@link PersistentCache}. The prefetch mechanism is triggered whenever a segment + * in the cache is {@link PersistentCache#readSegment(long, long, Callable)|accessed}. + * When this happens, all segments referenced by the accessed segment are asynchronously + * prefetched. + *

+ * Next to the concurrency level, i.e. how many threads are used for prefetching, the + * {@code prefetchDepth} (default: {@code 1}, which controls how many recursive levels + * of referenced segments are prefetched, can be configured. + *

+ * Prefetching is done asynchronously, but it may add some overhead. It is primarily + * recommended to parallelize slow I/O, e.g. when using a remote persistence. + *

+ * Different scenarios may warrant different prefetching strategies. A short-lived + * process traversing a repository (e.g. copy, offline-compaction) with an initially + * empty cache may benefit from a more threads and a higher prefetch-depth, while a + * long-running process, e.g. a web application, may perform better with fewer threads + * and a lower prefetch depth. + */ +public class PersistentCachePreloadingConfiguration { + + private final int concurrency; + + private int prefetchDepth; + + private PersistentCachePreloadingConfiguration(int concurrency, int prefetchDepth) { + this.concurrency = concurrency; + this.prefetchDepth = prefetchDepth; + } + + /** + * Creates a new {@link PersistentCachePreloadingConfiguration} with the given concurrency + * level and a {@code prefetchDepth} of {@code 1}. + * + * @param concurrency number of threads to use for prefetching + * @return a new configuration + */ + public static PersistentCachePreloadingConfiguration withConcurrency(int concurrency) { + return new PersistentCachePreloadingConfiguration(concurrency, 1); + } + + /** + * Set how many recursive levels of referenced segments should be prefetched. + * + * @param prefetchDepth depth of the prefetching, i.e. how many levels of referenced + * segments should be prefetched (default: {@code 1}) + * @return this configuration + */ + public PersistentCachePreloadingConfiguration withPrefetchDepth(int prefetchDepth) { + this.prefetchDepth = prefetchDepth; + return this; + } + + public int getConcurrency() { + return concurrency; + } + + public int getPrefetchDepth() { + return prefetchDepth; + } + + @Override + public String toString() { + return "PersistentCachePreloadingConfiguration{" + + "concurrency=" + concurrency + + ", prefetchDepth=" + prefetchDepth + + '}'; + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java index db4c23c9902..1573faf0346 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java @@ -15,7 +15,7 @@ * limitations under the License. */ @Internal(since = "1.0.0") -@Version("5.0.0") +@Version("5.1.0") package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.annotations.Internal; diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java new file mode 100644 index 00000000000..e5f9be0d9fe --- /dev/null +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.preloader; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SegmentPreloaderTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + @Test + public void testDecorationSkippedForWrongArguments() throws IOException { + Supplier tarFiles = () -> null; // never called + PersistentCache delegate = new MemoryTestCache(); + PersistentCache decorated = SegmentPreloader.decorate(delegate, PersistentCachePreloadingConfiguration.withConcurrency(0), tarFiles); + assertSame(delegate, decorated); + } + + @Test + public void testPreloading() throws IOException, InvalidFileStoreVersionException, CommitFailedException, InterruptedException { + SegmentNodeStorePersistence persistence = new TarPersistence(folder.getRoot()); + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .withCustomPersistence(persistence) + .build()) { + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = nodeStore.getRoot().builder(); + + generateContent(builder, 4, 8); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + MemoryTestCache underlyingCache = new MemoryTestCache(); + TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), persistence); + + SegmentPreloader preloadingCache = (SegmentPreloader)SegmentPreloader.decorate(underlyingCache, + PersistentCachePreloadingConfiguration.withConcurrency(8).withPrefetchDepth(2), () -> tarFiles); + + SegmentArchiveManager archiveManager = persistence.createArchiveManager(false, false, null, null, null); + assertEquals(List.of("data00000a.tar"), archiveManager.listArchives()); + try (@Nullable SegmentArchiveReader reader = archiveManager.open("data00000a.tar"); + JournalFileReader journalFileReader = persistence.getJournalFile().openJournalReader()) { + assertNotNull(reader); + + String line = journalFileReader.readLine(); + String[] parts = line.split(":"); + UUID root = UUID.fromString(parts[0]); + + SegmentGraph graph = reader.getGraph(); + Set referencedSegments = collectReferencedSegments(root, graph, 2); + for (UUID segment : referencedSegments) { + assertFalse(underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + assertFalse(preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + } + + preloadingCache.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), + () -> tarFiles.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); + + // wait for preloading to complete + while (preloadingCache.hasInProgressTasks()) { + TimeUnit.MILLISECONDS.sleep(50); + } + + for (UUID segment : referencedSegments) { + assertTrue("Segment missing in underlying cache: " + segment, + underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + assertTrue("Segment missing in preloading cache: " + segment, + preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + } + assertEquals(referencedSegments.size(), underlyingCache.segments.size()); + + UUID nextToLoad = null; + Set uuids = null; + for (UUID referencedSegment : referencedSegments) { + uuids = collectReferencedSegments(referencedSegment, graph, 2); + uuids.removeAll(referencedSegments); + if (!uuids.isEmpty()) { + nextToLoad = referencedSegment; + } + } + + assertNotNull(uuids); + assertNotNull(nextToLoad); + + final UUID next = nextToLoad; + preloadingCache.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits(), + () -> tarFiles.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits())); + + // wait for preloading to complete + while (preloadingCache.hasInProgressTasks()) { + TimeUnit.MILLISECONDS.sleep(50); + } + + preloadingCache.close(); + + for (UUID segment : uuids) { + assertTrue("Segment missing in underlying cache: " + segment, + underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + assertTrue("Segment missing in preloading cache: " + segment, + preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + } + } + } + + private TarFiles createReadOnlyTarFiles(File directory, SegmentNodeStorePersistence persistence) throws IOException { + return TarFiles.builder() + .withDirectory(directory) + .withPersistence(persistence) + .withReadOnly() + .withIOMonitor(new IOMonitorAdapter()) + .withRemoteStoreMonitor(new RemoteStoreMonitorAdapter()) + .withTarRecovery((uuid, data, entryRecovery) -> { + throw new UnsupportedOperationException(); + }) + .build(); + } + + private static Set collectReferencedSegments(UUID root, SegmentGraph graph, int depth) throws IOException { + Set uuids = new LinkedHashSet<>(); + uuids.add(root); + if (depth > 0) { + for (UUID edge : graph.getEdges(root)) { + uuids.addAll(collectReferencedSegments(edge, graph, depth - 1)); + } + } + return uuids; + } + + private void generateContent(NodeBuilder builder, int childNodes, int depth) { + RandomUtils r = RandomUtils.insecure(); + RandomStringUtils random = RandomStringUtils.insecure(); + for (int i = 0; i < childNodes; i++) { + NodeBuilder child = builder.child(random.nextAlphabetic(40, 60)); + child.setProperty("jcr:primaryType", random.nextAlphabetic(4)); + child.setProperty(random.nextAlphabetic(30, 40), r.randomBoolean() ? random.nextAlphabetic(100, 150) : r.randomLong()); + if (depth > 1) { + generateContent(child, childNodes, depth - 1); + } + } + } + + private static class MemoryTestCache implements PersistentCache { + + Map segments = new ConcurrentHashMap<>(); + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + return segments.computeIfAbsent(lsb, i -> { + try { + return loader.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return segments.containsKey(lsb); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + segments.put(lsb, buffer); + } + + @Override + public void cleanUp() { + segments.clear(); + } + } + +} From 70e124d20e98c0cfd30b5e80454956903b5527d5 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Fri, 10 Oct 2025 14:03:50 +0200 Subject: [PATCH 2/5] OAK-11934 - segment preloading for PersistentCache - fix unit-test - address some sonar warnings --- .../oak/segment/file/AbstractFileStore.java | 10 +- .../file/preloader/SegmentPreloader.java | 48 +++---- .../file/preloader/SegmentPreloaderTest.java | 124 +++++++++++------- 3 files changed, 100 insertions(+), 82 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 27b99454a0b..0d570fe145b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.commons.Buffer; @@ -163,15 +164,18 @@ public SegmentId newSegmentId(long msb, long lsb) { this.remoteStoreMonitor = builder.getRemoteStoreMonitor(); this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider()); this.binariesInlineThreshold = builder.getBinariesInlineThreshold(); + this.persistentCache = initializePersistentCache(builder, this::getTarFiles); + } + + private static @Nullable PersistentCache initializePersistentCache(FileStoreBuilder builder, Supplier tarFilesSupplier) { PersistentCache persistentCache = builder.getPersistentCache(); PersistentCachePreloadingConfiguration preloadingConfig = builder.getPreloadingConfiguration(); if (preloadingConfig != null) { Validate.checkState(persistentCache != null, "PersistentCache must be configured when using a PersistentCachePreloadConfiguration"); - this.persistentCache = SegmentPreloader.decorate(persistentCache, preloadingConfig, this::getTarFiles); - } else { - this.persistentCache = persistentCache; + persistentCache = SegmentPreloader.decorate(persistentCache, preloadingConfig, tarFilesSupplier); } + return persistentCache; } static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java index efa36ae89a3..2ef1405a801 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -66,8 +66,6 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close private final ConcurrentHashMap>> graphCache; - private final Set cachedSegments; - private final PersistentCache delegate; private final ExecutorService dispatchPool; @@ -100,7 +98,6 @@ private SegmentPreloader(@NotNull PersistentCache delegate, @NotNull PersistentC this.tarFiles = Suppliers.memoize(tarFiles); this.inProgressPrefetch = new ConcurrentHashMap<>(); this.graphCache = new ConcurrentHashMap<>(); - this.cachedSegments = ConcurrentHashMap.newKeySet(); this.prefetchDepth = config.getPrefetchDepth(); this.dispatchPool = new ThreadPoolExecutor(1,1, 1, TimeUnit.SECONDS, @@ -163,10 +160,6 @@ private void dispatch(@NotNull TarFiles tarFiles, Supplier execute(dispatchPool, new PrefetchDispatchTask(tarFiles, indicesSupplier, msb, lsb, depth)); } - private void prefetch(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { - execute(prefetchPool, new PrefetchTask(tarFiles, indicesSupplier, msb, lsb, depth)); - } - private void execute(ExecutorService pool, Runnable r) { if (registerInProgressTask(r)) { pool.execute(r); @@ -181,11 +174,6 @@ private void clearInProgressTask(Runnable r) { inProgressPrefetch.remove(r.hashCode()); } - @VisibleForTesting - boolean hasInProgressTasks() { - return !inProgressPrefetch.isEmpty(); - } - @Override public void close() { try { @@ -250,20 +238,24 @@ public void run() { for (UUID reference : graph.get(uuid)) { long refMsb = reference.getMostSignificantBits(); long refLsb = reference.getLeastSignificantBits(); - if (!cachedSegments.contains(reference) && !delegate.containsSegment(refMsb, refLsb)) { + if (!delegate.containsSegment(refMsb, refLsb)) { prefetch(tarFiles, () -> indices, refMsb, refLsb, depth); - } else if (depth < prefetchDepth - 1 && SegmentId.isDataSegmentId(refLsb)) { + } else if (depth < prefetchDepth && SegmentId.isDataSegmentId(refLsb)) { dispatch(tarFiles, () -> indices, refMsb, refLsb, depth); } } } + private void prefetch(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + execute(prefetchPool, new PrefetchTask(tarFiles, indicesSupplier, msb, lsb, depth)); + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o.getClass() == PrefetchDispatchTask.class)) { + if (o.getClass() != PrefetchDispatchTask.class) { return false; } PrefetchDispatchTask that = (PrefetchDispatchTask) o; @@ -322,24 +314,14 @@ private class PrefetchTask implements Runnable { @Override public void run() { LOG.debug("Running: {}", this); - try { - if (depth < prefetchDepth && SegmentId.isDataSegmentId(lsb)) { - dispatch(tarFiles, indicesSupplier, msb, lsb, depth); - } - UUID uuid = new UUID(msb, lsb); - if (!cachedSegments.contains(uuid) && !delegate.containsSegment(msb, lsb)) { - Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); - if (segmentBuffer != null) { - cachedSegments.add(uuid); - delegate.writeSegment(msb, lsb, segmentBuffer); - } + if (depth < prefetchDepth && SegmentId.isDataSegmentId(lsb)) { + dispatch(tarFiles, indicesSupplier, msb, lsb, depth); + } + if (!delegate.containsSegment(msb, lsb)) { + Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); + if (segmentBuffer != null) { + delegate.writeSegment(msb, lsb, segmentBuffer); } - } catch (SegmentNotFoundException e) { - LOG.warn("SegmentNotFoundException during prefetch of segment {}", new UUID(msb, lsb), e); - throw e; - } catch (Exception e) { - LOG.warn("Exception during prefetch of segment {}", new UUID(msb, lsb), e); - throw new RuntimeException(e); } } @@ -348,7 +330,7 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o.getClass() == PrefetchTask.class)) { + if (o.getClass() != PrefetchTask.class) { return false; } PrefetchTask that = (PrefetchTask) o; diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java index e5f9be0d9fe..da85394d9ca 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -22,19 +22,18 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; -import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; @@ -46,11 +45,15 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.sql.Time; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -67,21 +70,50 @@ public class SegmentPreloaderTest { + private static final Logger LOG = LoggerFactory.getLogger(SegmentPreloaderTest.class); + @Rule public TemporaryFolder folder = new TemporaryFolder(new File("target")); @Test - public void testDecorationSkippedForWrongArguments() throws IOException { + public void testDecorationSkippedForWrongArguments() { Supplier tarFiles = () -> null; // never called PersistentCache delegate = new MemoryTestCache(); PersistentCache decorated = SegmentPreloader.decorate(delegate, PersistentCachePreloadingConfiguration.withConcurrency(0), tarFiles); assertSame(delegate, decorated); } + @Test + public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException { + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .build()) { + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = nodeStore.getRoot().builder(); + + generateContent(builder, 4, 4); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + MemoryTestCache persistentCache = new MemoryTestCache(); + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .withPersistentCache(persistentCache) + .withPersistentCachePreloading(PersistentCachePreloadingConfiguration.withConcurrency(4).withPrefetchDepth(1)) + .build()) { + SegmentId root = fileStore.getRevisions().getPersistedHead().getSegmentId(); + Segment segment = root.getSegment(); + int referencedSegmentIdCount = segment.getReferencedSegmentIdCount(); + + assertTrue(persistentCache.containsSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); + assertEquals(1 + referencedSegmentIdCount, persistentCache.segments.size()); + } + } + @Test public void testPreloading() throws IOException, InvalidFileStoreVersionException, CommitFailedException, InterruptedException { SegmentNodeStorePersistence persistence = new TarPersistence(folder.getRoot()); try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .withMaxFileSize(4) + .withMemoryMapping(false) .withCustomPersistence(persistence) .build()) { SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); @@ -92,23 +124,18 @@ public void testPreloading() throws IOException, InvalidFileStoreVersionExceptio } MemoryTestCache underlyingCache = new MemoryTestCache(); - TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), persistence); - SegmentPreloader preloadingCache = (SegmentPreloader)SegmentPreloader.decorate(underlyingCache, - PersistentCachePreloadingConfiguration.withConcurrency(8).withPrefetchDepth(2), () -> tarFiles); - - SegmentArchiveManager archiveManager = persistence.createArchiveManager(false, false, null, null, null); - assertEquals(List.of("data00000a.tar"), archiveManager.listArchives()); - try (@Nullable SegmentArchiveReader reader = archiveManager.open("data00000a.tar"); + try (TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), persistence); + SegmentPreloader preloadingCache = (SegmentPreloader)SegmentPreloader.decorate(underlyingCache, + PersistentCachePreloadingConfiguration.withConcurrency(8).withPrefetchDepth(2), () -> tarFiles); JournalFileReader journalFileReader = persistence.getJournalFile().openJournalReader()) { - assertNotNull(reader); - String line = journalFileReader.readLine(); - String[] parts = line.split(":"); - UUID root = UUID.fromString(parts[0]); + UUID root = getRootUUID(journalFileReader); + + assertTrue(tarFiles.getIndices().size() > 2); + Map> graph = computeFullGraph(tarFiles); - SegmentGraph graph = reader.getGraph(); - Set referencedSegments = collectReferencedSegments(root, graph, 2); + Set referencedSegments = collectReferencedSegments(root, graph, 1); for (UUID segment : referencedSegments) { assertFalse(underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); assertFalse(preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); @@ -116,51 +143,56 @@ public void testPreloading() throws IOException, InvalidFileStoreVersionExceptio preloadingCache.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), () -> tarFiles.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); - - // wait for preloading to complete - while (preloadingCache.hasInProgressTasks()) { - TimeUnit.MILLISECONDS.sleep(50); - } - - for (UUID segment : referencedSegments) { - assertTrue("Segment missing in underlying cache: " + segment, - underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); - assertTrue("Segment missing in preloading cache: " + segment, - preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); - } - assertEquals(referencedSegments.size(), underlyingCache.segments.size()); + assertReferencedSegmentsLoaded(referencedSegments, underlyingCache, preloadingCache); UUID nextToLoad = null; Set uuids = null; for (UUID referencedSegment : referencedSegments) { uuids = collectReferencedSegments(referencedSegment, graph, 2); - uuids.removeAll(referencedSegments); + uuids.removeIf(uuid -> underlyingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); if (!uuids.isEmpty()) { nextToLoad = referencedSegment; } } - assertNotNull(uuids); assertNotNull(nextToLoad); final UUID next = nextToLoad; preloadingCache.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits(), () -> tarFiles.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits())); + LOG.info("Next loaded segment: {}", next); + assertReferencedSegmentsLoaded(uuids, underlyingCache, preloadingCache); + } + } - // wait for preloading to complete - while (preloadingCache.hasInProgressTasks()) { - TimeUnit.MILLISECONDS.sleep(50); - } + private static @NotNull UUID getRootUUID(JournalFileReader journalFileReader) throws IOException { + String line = journalFileReader.readLine(); + String[] parts = line.split(":"); + return UUID.fromString(parts[0]); + } - preloadingCache.close(); + private void assertReferencedSegmentsLoaded(Set referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) throws InterruptedException { + long timeoutSec = 10; + long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSec); + Set segments = new HashSet<>(referencedSegments); + while (!segments.isEmpty() && System.currentTimeMillis() < deadline) { + segments.removeIf(segment -> + underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits()) + && preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + TimeUnit.MILLISECONDS.sleep(10); + } - for (UUID segment : uuids) { - assertTrue("Segment missing in underlying cache: " + segment, - underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); - assertTrue("Segment missing in preloading cache: " + segment, - preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); - } + assertEquals("Not all referenced segments have been preloaded within " + timeoutSec + " seconds", + Set.of(), segments); + } + + private static Map> computeFullGraph(TarFiles tarFiles) throws IOException { + Map> fullGraph = new HashMap<>(); + for (String archiveName : tarFiles.getIndices().keySet()) { + Map> graph = tarFiles.getGraph(archiveName); + fullGraph.putAll(graph); } + return fullGraph; } private TarFiles createReadOnlyTarFiles(File directory, SegmentNodeStorePersistence persistence) throws IOException { @@ -176,11 +208,11 @@ private TarFiles createReadOnlyTarFiles(File directory, SegmentNodeStorePersiste .build(); } - private static Set collectReferencedSegments(UUID root, SegmentGraph graph, int depth) throws IOException { + private static Set collectReferencedSegments(UUID root, Map> graph, int depth) throws IOException { Set uuids = new LinkedHashSet<>(); uuids.add(root); if (depth > 0) { - for (UUID edge : graph.getEdges(root)) { + for (UUID edge : graph.get(root)) { uuids.addAll(collectReferencedSegments(edge, graph, depth - 1)); } } From 92ad096b93e0adc95cd0db1417aed23a1884be7f Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Fri, 10 Oct 2025 15:04:38 +0200 Subject: [PATCH 3/5] OAK-11934 - segment preloading for PersistentCache - use Awaitility for unit test --- oak-segment-tar/pom.xml | 6 ++++ .../file/preloader/SegmentPreloaderTest.java | 33 +++++++++++-------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index eab0cfd6573..883efdd3cd0 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -369,6 +369,12 @@ org.apache.sling.testing.osgi-mock.junit4 test + + org.awaitility + awaitility + 4.2.1 + test + org.osgi org.osgi.service.event diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java index da85394d9ca..2fe4146c067 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.awaitility.Awaitility; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.Rule; @@ -50,7 +51,6 @@ import java.io.File; import java.io.IOException; -import java.sql.Time; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -84,7 +84,7 @@ public void testDecorationSkippedForWrongArguments() { } @Test - public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException { + public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException, InterruptedException { try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) .build()) { SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); @@ -101,10 +101,13 @@ public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOExc .build()) { SegmentId root = fileStore.getRevisions().getPersistedHead().getSegmentId(); Segment segment = root.getSegment(); - int referencedSegmentIdCount = segment.getReferencedSegmentIdCount(); + + int expectedCacheSize = 1 + segment.getReferencedSegmentIdCount(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(expectedCacheSize, persistentCache.segments.size())); assertTrue(persistentCache.containsSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); - assertEquals(1 + referencedSegmentIdCount, persistentCache.segments.size()); } } @@ -172,18 +175,20 @@ public void testPreloading() throws IOException, InvalidFileStoreVersionExceptio } private void assertReferencedSegmentsLoaded(Set referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) throws InterruptedException { - long timeoutSec = 10; - long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSec); Set segments = new HashSet<>(referencedSegments); - while (!segments.isEmpty() && System.currentTimeMillis() < deadline) { - segments.removeIf(segment -> - underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits()) - && preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); - TimeUnit.MILLISECONDS.sleep(10); - } + int timeoutSec = 10; + Awaitility + .await() + .atMost(timeoutSec, TimeUnit.SECONDS) + .untilAsserted(() -> { + segments.removeIf(uuid -> + underlyingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()) + && preloadingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + assertEquals("Not all referenced segments have been preloaded within " + timeoutSec + " seconds", + Set.of(), segments); + }); + - assertEquals("Not all referenced segments have been preloaded within " + timeoutSec + " seconds", - Set.of(), segments); } private static Map> computeFullGraph(TarFiles tarFiles) throws IOException { From 22ff2a65f7cfae4749eba5df0fba0decf3b8b192 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Mon, 13 Oct 2025 09:45:28 +0200 Subject: [PATCH 4/5] OAK-11934 - segment preloading for PersistentCache - test coverage, consistent naming --- .../file/preloader/SegmentPreloader.java | 110 ++++++++-------- ...ersistentCachePreloadingConfiguration.java | 44 +++---- .../file/preloader/SegmentPreloaderTest.java | 118 +++++++++++++++++- 3 files changed, 196 insertions(+), 76 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java index 2ef1405a801..13ca8089824 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -21,14 +21,12 @@ import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.commons.internal.function.Suppliers; import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +51,7 @@ /** * A {@link PersistentCache} decorator that preloads segments into the cache by - * asynchronously prefetching segments referenced by a segment that is being read + * asynchronously preloading segments referenced by a segment that is being read * from the cache. * * @see PersistentCachePreloadingConfiguration @@ -70,9 +68,9 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close private final ExecutorService dispatchPool; - private final ExecutorService prefetchPool; + private final ExecutorService preloadPool; - private final int prefetchDepth; + private final int preloadDepth; private final Supplier tarFiles; @@ -87,7 +85,7 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close * @return the decorated cache or the given {@code delegate} if no preloading is configured */ public static @NotNull PersistentCache decorate(@NotNull PersistentCache delegate, @NotNull PersistentCachePreloadingConfiguration config, @NotNull Supplier tarFiles) { - if (config.getConcurrency() > 0 && config.getPrefetchDepth() > 0) { + if (config.getConcurrency() > 0 && config.getMaxPreloadDepth() > 0) { return new SegmentPreloader(delegate, config, tarFiles); } return delegate; @@ -98,23 +96,23 @@ private SegmentPreloader(@NotNull PersistentCache delegate, @NotNull PersistentC this.tarFiles = Suppliers.memoize(tarFiles); this.inProgressPrefetch = new ConcurrentHashMap<>(); this.graphCache = new ConcurrentHashMap<>(); - this.prefetchDepth = config.getPrefetchDepth(); + this.preloadDepth = config.getMaxPreloadDepth(); this.dispatchPool = new ThreadPoolExecutor(1,1, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), - r -> new Thread(r, "segment-prefetch-dispatcher")) { + r -> new Thread(r, "segment-preload-dispatcher")) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); clearInProgressTask(r); } }; - int prefetchThreads = config.getConcurrency(); - this.prefetchPool = new ThreadPoolExecutor(Math.max(1, prefetchThreads / 4), prefetchThreads, - 10, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(prefetchThreads * 4), + int preloadThreads = config.getConcurrency(); + ThreadPoolExecutor preloadPool = new ThreadPoolExecutor(Math.max(1, preloadThreads / 4), preloadThreads, + 5, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(preloadThreads * 4), r -> { - String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); + String threadName = String.format("segment-preload-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); Thread thread = new Thread(r, threadName); thread.setUncaughtExceptionHandler((t, e) -> { if (!(e instanceof InterruptedException)) { @@ -126,7 +124,7 @@ protected void afterExecute(Runnable r, Throwable t) { (r, executor) -> { try { // force the caller thread to wait for space in the queue (this is always a thread in the dispatchPool) - // this creates back-pressure to the dispatchPool, slowing down the dispatching of new prefetch tasks + // this creates back-pressure to the dispatchPool, slowing down the dispatching of new preload tasks executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -139,6 +137,8 @@ protected void afterExecute(Runnable r, Throwable t) { clearInProgressTask(r); } }; + preloadPool.allowCoreThreadTimeOut(true); + this.preloadPool = preloadPool; } @Override @@ -148,16 +148,29 @@ protected PersistentCache delegate() { @Override public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { - dispatch(tarFiles.get(), msb, lsb); + dispatch(msb, lsb); return delegate().readSegment(msb, lsb, loader); } - private void dispatch(@NotNull TarFiles tarFiles, long msb, long lsb) { - dispatch(tarFiles, tarFiles::getIndices, msb, lsb, 0); + private void dispatch(long msb, long lsb) { + dispatch(msb, lsb, 1); } - private void dispatch(@NotNull TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { - execute(dispatchPool, new PrefetchDispatchTask(tarFiles, indicesSupplier, msb, lsb, depth)); + private void dispatch(long msb, long lsb, int depth) { + execute(dispatchPool, createDispatchTask(msb, lsb, depth)); + } + + @NotNull SegmentPreloader.DispatchTask createDispatchTask(long msb, long lsb, int depth) { + TarFiles tars = tarFiles.get(); + return new DispatchTask(tars, tars::getIndices, msb, lsb, depth); + } + + private void preload(long msb, long lsb, int depth) { + execute(preloadPool, createPreloadTask(msb, lsb, depth)); + } + + @NotNull SegmentPreloader.PreloadTask createPreloadTask(long msb, long lsb, int depth) { + return new PreloadTask(tarFiles.get(), msb, lsb, depth); } private void execute(ExecutorService pool, Runnable r) { @@ -177,22 +190,22 @@ private void clearInProgressTask(Runnable r) { @Override public void close() { try { - prefetchPool.shutdown(); + preloadPool.shutdown(); dispatchPool.shutdown(); - if (!prefetchPool.awaitTermination(4, TimeUnit.SECONDS)) { - prefetchPool.shutdownNow(); + if (!preloadPool.awaitTermination(4, TimeUnit.SECONDS)) { + preloadPool.shutdownNow(); } if (!dispatchPool.awaitTermination(1, TimeUnit.SECONDS)) { dispatchPool.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - prefetchPool.shutdownNow(); + preloadPool.shutdownNow(); dispatchPool.shutdownNow(); } } - private class PrefetchDispatchTask implements Runnable, Comparable { + class DispatchTask implements Runnable, Comparable { private final TarFiles tarFiles; @@ -206,13 +219,13 @@ private class PrefetchDispatchTask implements Runnable, Comparable>> indicesSupplier, long msb, long lsb, int depth) { - checkArgument(depth < prefetchDepth, "depth must be < %d, is %d", prefetchDepth, depth); + private DispatchTask(@NotNull TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); this.tarFiles = tarFiles; this.indicesSupplier = indicesSupplier; this.msb = msb; this.lsb = lsb; - this.depth = depth + 1; + this.depth = depth; LOG.debug("Created: {}", this); } @@ -239,26 +252,22 @@ public void run() { long refMsb = reference.getMostSignificantBits(); long refLsb = reference.getLeastSignificantBits(); if (!delegate.containsSegment(refMsb, refLsb)) { - prefetch(tarFiles, () -> indices, refMsb, refLsb, depth); - } else if (depth < prefetchDepth && SegmentId.isDataSegmentId(refLsb)) { - dispatch(tarFiles, () -> indices, refMsb, refLsb, depth); + preload(refMsb, refLsb, depth); + } else if (depth < preloadDepth && SegmentId.isDataSegmentId(refLsb)) { + dispatch(refMsb, refLsb, depth + 1); } } } - private void prefetch(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { - execute(prefetchPool, new PrefetchTask(tarFiles, indicesSupplier, msb, lsb, depth)); - } - @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o.getClass() != PrefetchDispatchTask.class) { + if (o.getClass() != DispatchTask.class) { return false; } - PrefetchDispatchTask that = (PrefetchDispatchTask) o; + DispatchTask that = (DispatchTask) o; return msb == that.msb && lsb == that.lsb && depth == that.depth; } @@ -269,10 +278,10 @@ public int hashCode() { @Override public String toString() { - return "PrefetchDispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; } - private int getPrefetchDepth() { + private int getPreloadDepth() { return depth; } @@ -281,30 +290,27 @@ private long getCreationTime() { } @Override - public int compareTo(@NotNull SegmentPreloader.PrefetchDispatchTask o) { + public int compareTo(@NotNull SegmentPreloader.DispatchTask o) { return Comparator - .comparing(PrefetchDispatchTask::getPrefetchDepth) - .thenComparing(PrefetchDispatchTask::getCreationTime) + .comparing(DispatchTask::getPreloadDepth) + .thenComparing(DispatchTask::getCreationTime) .compare(this, o); } } - private class PrefetchTask implements Runnable { + class PreloadTask implements Runnable { private final TarFiles tarFiles; - private final Supplier>> indicesSupplier; - private final long msb; private final long lsb; private final int depth; - PrefetchTask(TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { - checkArgument(depth <= prefetchDepth, "depth must be <= %d, is %d", prefetchDepth, depth); + private PreloadTask(TarFiles tarFiles, long msb, long lsb, int depth) { + checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); this.tarFiles = tarFiles; - this.indicesSupplier = indicesSupplier; this.msb = msb; this.lsb = lsb; this.depth = depth; @@ -314,8 +320,8 @@ private class PrefetchTask implements Runnable { @Override public void run() { LOG.debug("Running: {}", this); - if (depth < prefetchDepth && SegmentId.isDataSegmentId(lsb)) { - dispatch(tarFiles, indicesSupplier, msb, lsb, depth); + if (depth < preloadDepth && SegmentId.isDataSegmentId(lsb)) { + dispatch(msb, lsb, depth + 1); } if (!delegate.containsSegment(msb, lsb)) { Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); @@ -330,10 +336,10 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (o.getClass() != PrefetchTask.class) { + if (o.getClass() != PreloadTask.class) { return false; } - PrefetchTask that = (PrefetchTask) o; + PreloadTask that = (PreloadTask) o; return msb == that.msb && lsb == that.lsb; } @@ -344,7 +350,7 @@ public int hashCode() { @Override public String toString() { - return "PrefetchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + return "PreloadTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java index 02d576ffe30..c8477da720c 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java @@ -21,41 +21,41 @@ import java.util.concurrent.Callable; /** - * Configuration for a segment prefetch mechanism that preloads segments into a - * {@link PersistentCache}. The prefetch mechanism is triggered whenever a segment + * Configuration for a segment preload mechanism that preloads segments into a + * {@link PersistentCache}. The preload mechanism is triggered whenever a segment * in the cache is {@link PersistentCache#readSegment(long, long, Callable)|accessed}. * When this happens, all segments referenced by the accessed segment are asynchronously - * prefetched. + * preloaded. *

- * Next to the concurrency level, i.e. how many threads are used for prefetching, the - * {@code prefetchDepth} (default: {@code 1}, which controls how many recursive levels - * of referenced segments are prefetched, can be configured. + * Next to the concurrency level, i.e. how many threads are used for preloading, the + * {@code maxPreloadDepth} (default: {@code 1}, which controls how many recursive levels + * of referenced segments are preloaded, can be configured. *

* Prefetching is done asynchronously, but it may add some overhead. It is primarily * recommended to parallelize slow I/O, e.g. when using a remote persistence. *

- * Different scenarios may warrant different prefetching strategies. A short-lived + * Different scenarios may warrant different preloading strategies. A short-lived * process traversing a repository (e.g. copy, offline-compaction) with an initially - * empty cache may benefit from a more threads and a higher prefetch-depth, while a + * empty cache may benefit from a more threads and a higher preload-depth, while a * long-running process, e.g. a web application, may perform better with fewer threads - * and a lower prefetch depth. + * and a lower preload depth. */ public class PersistentCachePreloadingConfiguration { private final int concurrency; - private int prefetchDepth; + private int maxPreloadDepth; - private PersistentCachePreloadingConfiguration(int concurrency, int prefetchDepth) { + private PersistentCachePreloadingConfiguration(int concurrency, int preloadDepth) { this.concurrency = concurrency; - this.prefetchDepth = prefetchDepth; + this.maxPreloadDepth = preloadDepth; } /** * Creates a new {@link PersistentCachePreloadingConfiguration} with the given concurrency - * level and a {@code prefetchDepth} of {@code 1}. + * level and a {@code preloadDepth} of {@code 1}. * - * @param concurrency number of threads to use for prefetching + * @param concurrency number of threads to use for preloading * @return a new configuration */ public static PersistentCachePreloadingConfiguration withConcurrency(int concurrency) { @@ -63,14 +63,14 @@ public static PersistentCachePreloadingConfiguration withConcurrency(int concurr } /** - * Set how many recursive levels of referenced segments should be prefetched. + * Set how many recursive levels of referenced segments should be preloaded. * - * @param prefetchDepth depth of the prefetching, i.e. how many levels of referenced - * segments should be prefetched (default: {@code 1}) + * @param maxPreloadDepth depth of the preloading, i.e. how many levels of referenced + * segments should be preloaded (default: {@code 1}) * @return this configuration */ - public PersistentCachePreloadingConfiguration withPrefetchDepth(int prefetchDepth) { - this.prefetchDepth = prefetchDepth; + public PersistentCachePreloadingConfiguration withMaxPreloadDepth(int maxPreloadDepth) { + this.maxPreloadDepth = maxPreloadDepth; return this; } @@ -78,15 +78,15 @@ public int getConcurrency() { return concurrency; } - public int getPrefetchDepth() { - return prefetchDepth; + public int getMaxPreloadDepth() { + return maxPreloadDepth; } @Override public String toString() { return "PersistentCachePreloadingConfiguration{" + "concurrency=" + concurrency + - ", prefetchDepth=" + prefetchDepth + + ", maxPreloadDepth=" + maxPreloadDepth + '}'; } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java index 2fe4146c067..389eb5d6781 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -29,6 +29,8 @@ import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader.DispatchTask; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader.PreloadTask; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; @@ -51,21 +53,27 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class SegmentPreloaderTest { @@ -97,7 +105,7 @@ public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOExc MemoryTestCache persistentCache = new MemoryTestCache(); try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) .withPersistentCache(persistentCache) - .withPersistentCachePreloading(PersistentCachePreloadingConfiguration.withConcurrency(4).withPrefetchDepth(1)) + .withPersistentCachePreloading(PersistentCachePreloadingConfiguration.withConcurrency(4).withMaxPreloadDepth(1)) .build()) { SegmentId root = fileStore.getRevisions().getPersistedHead().getSegmentId(); Segment segment = root.getSegment(); @@ -130,7 +138,7 @@ public void testPreloading() throws IOException, InvalidFileStoreVersionExceptio try (TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), persistence); SegmentPreloader preloadingCache = (SegmentPreloader)SegmentPreloader.decorate(underlyingCache, - PersistentCachePreloadingConfiguration.withConcurrency(8).withPrefetchDepth(2), () -> tarFiles); + PersistentCachePreloadingConfiguration.withConcurrency(8).withMaxPreloadDepth(2), () -> tarFiles); JournalFileReader journalFileReader = persistence.getJournalFile().openJournalReader()) { UUID root = getRootUUID(journalFileReader); @@ -168,6 +176,112 @@ public void testPreloading() throws IOException, InvalidFileStoreVersionExceptio } } + @Test + public void testDispatchTaskEquals() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + DispatchTask task1 = preloader.createDispatchTask(msb, lsb, 1); + assertEquals(task1, task1); + + DispatchTask task2 = preloader.createDispatchTask(msb, lsb, 1); + assertEquals(task1, task2); + + DispatchTask task3 = preloader.createDispatchTask(msb, lsb, 0); + assertNotEquals(task1, task3); + + DispatchTask task4 = preloader.createDispatchTask(msb, lsb + 1, 1); + assertNotEquals(task1, task4); + + DispatchTask task5 = preloader.createDispatchTask(msb + 1, lsb, 1); + assertNotEquals(task1, task5); + + assertNotEquals(task1, new Object()); + }); + } + + @Test + public void testDispatchTaskArgumentValidation() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertThrows(IllegalArgumentException.class, () -> preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 3)); + }); + } + + @Test + public void testDispatchTaskToString() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertEquals( + "DispatchTask{segmentId=" + uuid + ", depth=1}", + preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); + }); + } + + @Test + public void testDispatchTaskCompareTo() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + DispatchTask task1 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + DispatchTask task2 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1); + DispatchTask task3 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + List tasks = new ArrayList<>(); + tasks.add(task1); + tasks.add(task2); + tasks.add(task3); + Collections.sort(tasks); + assertEquals(List.of(task2, task3, task1), tasks); + }); + } + + @Test + public void testPreloadTaskEquals() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + PreloadTask task1 = preloader.createPreloadTask(msb, lsb, 1); + assertEquals(task1, task1); + + PreloadTask task2 = preloader.createPreloadTask(msb, lsb, 1); + assertEquals(task1, task2); + + PreloadTask task3 = preloader.createPreloadTask(msb, lsb, 0); + assertEquals(task1, task3); // depth is not considered for equality + + PreloadTask task4 = preloader.createPreloadTask(msb, lsb + 1, 1); + assertNotEquals(task1, task4); + + PreloadTask task5 = preloader.createPreloadTask(msb + 1, lsb, 1); + assertNotEquals(task1, task5); + + assertNotEquals(task1, new Object()); + + }); + } + + @Test + public void testPreloadTaskToString() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertEquals("PreloadTask{segmentId=" + uuid + ", depth=1}", + preloader.createPreloadTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); + }); + } + + private void withSegmentPreloader(Consumer withPreloader) throws IOException { + MemoryTestCache cache = new MemoryTestCache(); + PersistentCachePreloadingConfiguration config = + PersistentCachePreloadingConfiguration.withConcurrency(2).withMaxPreloadDepth(2); + try (TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), new TarPersistence(folder.getRoot())); + SegmentPreloader preloader = (SegmentPreloader) SegmentPreloader.decorate(cache, config, () -> tarFiles)) { + withPreloader.accept(preloader); + } + } + private static @NotNull UUID getRootUUID(JournalFileReader journalFileReader) throws IOException { String line = journalFileReader.readLine(); String[] parts = line.split(":"); From d085b0dbdf1b2196bf6117470deee3a430e8bc59 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Mon, 13 Oct 2025 15:20:12 +0200 Subject: [PATCH 5/5] OAK-11934 - segment preloading for PersistentCache - cleanup --- .../oak/segment/file/preloader/SegmentPreloader.java | 2 +- .../oak/segment/file/preloader/SegmentPreloaderTest.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java index 13ca8089824..0a136710fcf 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -174,7 +174,7 @@ private void preload(long msb, long lsb, int depth) { } private void execute(ExecutorService pool, Runnable r) { - if (registerInProgressTask(r)) { + if (!pool.isShutdown() && registerInProgressTask(r)) { pool.execute(r); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java index 389eb5d6781..45c597c0d6b 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -92,7 +92,7 @@ public void testDecorationSkippedForWrongArguments() { } @Test - public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException, InterruptedException { + public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException { try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) .build()) { SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); @@ -120,7 +120,7 @@ public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOExc } @Test - public void testPreloading() throws IOException, InvalidFileStoreVersionException, CommitFailedException, InterruptedException { + public void testPreloading() throws IOException, InvalidFileStoreVersionException, CommitFailedException { SegmentNodeStorePersistence persistence = new TarPersistence(folder.getRoot()); try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) .withMaxFileSize(4) @@ -288,7 +288,7 @@ private void withSegmentPreloader(Consumer withPreloader) thro return UUID.fromString(parts[0]); } - private void assertReferencedSegmentsLoaded(Set referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) throws InterruptedException { + private void assertReferencedSegmentsLoaded(Set referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) { Set segments = new HashSet<>(referencedSegments); int timeoutSec = 10; Awaitility @@ -327,7 +327,7 @@ private TarFiles createReadOnlyTarFiles(File directory, SegmentNodeStorePersiste .build(); } - private static Set collectReferencedSegments(UUID root, Map> graph, int depth) throws IOException { + private static Set collectReferencedSegments(UUID root, Map> graph, int depth) { Set uuids = new LinkedHashSet<>(); uuids.add(root); if (depth > 0) {