diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index eab0cfd6573..883efdd3cd0 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -369,6 +369,12 @@ org.apache.sling.testing.osgi-mock.junit4 test + + org.awaitility + awaitility + 4.2.1 + test + org.osgi org.osgi.service.event diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 33a23b0e465..0d570fe145b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -28,9 +28,12 @@ import java.util.Set; import java.util.UUID; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.conditions.Validate; +import org.apache.jackrabbit.oak.commons.pio.Closer; import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.RecordType; import org.apache.jackrabbit.oak.segment.Revisions; @@ -48,6 +51,7 @@ import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentWriter; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader; import org.apache.jackrabbit.oak.segment.file.tar.EntryRecovery; import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; @@ -55,6 +59,8 @@ import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.stats.StatsOptions; import org.jetbrains.annotations.NotNull; @@ -89,6 +95,8 @@ public abstract class AbstractFileStore implements SegmentStore, Closeable { */ private static final int MAX_STORE_VERSION = 2; + protected final @Nullable PersistentCache persistentCache; + static ManifestChecker newManifestChecker(SegmentNodeStorePersistence persistence, boolean strictVersionCheck) throws IOException { return ManifestChecker.newManifestChecker( persistence.getManifestFile(), @@ -130,7 +138,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th protected final IOMonitor ioMonitor; protected final RemoteStoreMonitor remoteStoreMonitor; - + protected final int binariesInlineThreshold; AbstractFileStore(final FileStoreBuilder builder) { @@ -156,6 +164,18 @@ public SegmentId newSegmentId(long msb, long lsb) { this.remoteStoreMonitor = builder.getRemoteStoreMonitor(); this.segmentBufferMonitor = new SegmentBufferMonitor(builder.getStatsProvider()); this.binariesInlineThreshold = builder.getBinariesInlineThreshold(); + this.persistentCache = initializePersistentCache(builder, this::getTarFiles); + } + + private static @Nullable PersistentCache initializePersistentCache(FileStoreBuilder builder, Supplier tarFilesSupplier) { + PersistentCache persistentCache = builder.getPersistentCache(); + PersistentCachePreloadingConfiguration preloadingConfig = builder.getPreloadingConfiguration(); + if (preloadingConfig != null) { + Validate.checkState(persistentCache != null, + "PersistentCache must be configured when using a PersistentCachePreloadConfiguration"); + persistentCache = SegmentPreloader.decorate(persistentCache, preloadingConfig, tarFilesSupplier); + } + return persistentCache; } static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) { @@ -165,6 +185,8 @@ static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentI return new SegmentNotFoundException(id, e); } + abstract TarFiles getTarFiles(); + @NotNull public CacheStatsMBean getSegmentCacheStats() { return segmentCache.getCacheStats(); @@ -192,7 +214,7 @@ public SegmentReader getReader() { public SegmentIdProvider getSegmentIdProvider() { return tracker; } - + public int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -281,6 +303,25 @@ public void consume(int number, RecordType type, int offset) { return binaryReferences; } + @Override + public void close() { + doClose(); + System.gc(); // for any memory-mappings that are no longer used + log.info("TarMK closed: {}", directory); + } + + protected void doClose() { + Closer closer = Closer.create(); + registerCloseables(closer); + closeAndLogOnFail(closer); + } + + protected void registerCloseables(Closer closer) { + if (persistentCache instanceof Closeable) { + closer.register((Closeable) persistentCache); + } + } + static void closeAndLogOnFail(Closeable closeable) { if (closeable != null) { try { @@ -292,11 +333,19 @@ static void closeAndLogOnFail(Closeable closeable) { } } - Segment readSegmentUncached(TarFiles tarFiles, SegmentId id) { - Buffer buffer = tarFiles.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + Segment readSegmentUncached(SegmentId id) { + Buffer buffer; + if (persistentCache != null) { + buffer = persistentCache.readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(), + () -> getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits())); + } else { + buffer = getTarFiles().readSegment(id.getMostSignificantBits(), id.getLeastSignificantBits()); + } + if (buffer == null) { throw new SegmentNotFoundException(id); } + segmentBufferMonitor.trackAllocation(buffer); return new Segment(tracker, id, buffer); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index e933b5611e8..01776478b6b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -249,6 +249,11 @@ FileStore bind(TarRevisions revisions) throws IOException { } } + @Override + TarFiles getTarFiles() { + return tarFiles; + } + @NotNull private Supplier initialNode() { return new Supplier() { @@ -483,21 +488,23 @@ public void close() { log.warn("Unable to flush the store", e); } - Closer closer = Closer.create(); - closer.register(repositoryLock::unlock); - closer.register(tarFiles) ; - closer.register(revisions); - - closeAndLogOnFail(closer); + doClose(); } // Try removing pending files in case the scheduler didn't have a chance to run yet System.gc(); // for any memory-mappings that are no longer used fileReaper.reap(); - log.info("TarMK closed: {}", directory); } + @Override + protected void registerCloseables(Closer closer) { + closer.register(repositoryLock::unlock); + closer.register(tarFiles) ; + closer.register(revisions); + super.registerCloseables(closer); + } + @Override public boolean containsSegment(SegmentId id) { try (ShutDownCloser ignored = shutDown.keepAlive()) { @@ -509,7 +516,7 @@ public boolean containsSegment(SegmentId id) { @NotNull public Segment readSegment(final SegmentId id) { try (ShutDownCloser ignored = shutDown.keepAlive()) { - return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id)); + return segmentCache.getSegment(id, () -> readSegmentUncached(id)); } catch (ExecutionException | UncheckedExecutionException e) { if (e.getCause() instanceof RepositoryNotReachableException) { RepositoryNotReachableException re = (RepositoryNotReachableException) e.getCause(); @@ -573,6 +580,10 @@ public void writeSegment(SegmentId id, byte[] buffer, int offset, int length) th if (!eagerSegmentCaching && segment != null) { segmentCache.putSegment(segment); } + + if (persistentCache != null) { + persistentCache.writeSegment(id.getMostSignificantBits(), id.getLeastSignificantBits(), Buffer.wrap(buffer, offset, length)); + } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index fd390022321..04e68ae6f55 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -48,6 +48,8 @@ import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.monitor.*; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceLogWriter; import org.apache.jackrabbit.oak.segment.tool.iotrace.IOTraceMonitor; import org.apache.jackrabbit.oak.spi.blob.BlobStore; @@ -94,7 +96,7 @@ public class FileStoreBuilder { private boolean memoryMapping = MEMORY_MAPPING_DEFAULT; private boolean offHeapAccess = getBoolean("access.off.heap"); - + private int binariesInlineThreshold = Segment.MEDIUM_LIMIT; private SegmentNodeStorePersistence persistence; @@ -108,6 +110,12 @@ public class FileStoreBuilder { @Nullable private EvictingWriteCacheManager cacheManager; + @Nullable + private PersistentCache persistentCache; + + @Nullable + private PersistentCachePreloadingConfiguration preloadingConfig; + private class FileStoreGCListener extends DelegatingGCMonitor implements GCListener { @Override public void compactionSucceeded(@NotNull GCGeneration newGeneration) { @@ -256,6 +264,34 @@ public FileStoreBuilder withNodeDeduplicationCacheSize(int nodeDeduplicationCach return this; } + /** + * The {@link PersistentCache} instance to be used, if any. By default no {@code PersistentCache} + * is configured. + * + * @param persistentCache the persistent cache + * @return this instance + */ + @NotNull + public FileStoreBuilder withPersistentCache(@NotNull PersistentCache persistentCache) { + this.persistentCache = requireNonNull(persistentCache); + return this; + } + + /** + * The {@link PersistentCachePreloadingConfiguration} to be used for preloading segments. + * This configuration must be used together with a {@link PersistentCache}, configured + * via {@link #withPersistentCache(PersistentCache)}. By default, no segment preloading + * is configured. + * + * @param preloadingConfig the configuration for persistent cache preloading + * @return this instance + */ + @NotNull + public FileStoreBuilder withPersistentCachePreloading(@NotNull PersistentCachePreloadingConfiguration preloadingConfig) { + this.preloadingConfig = requireNonNull(preloadingConfig); + return this; + } + /** * Turn memory mapping on or off * @@ -397,7 +433,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) { this.eagerSegmentCaching = eagerSegmentCaching; return this; } - + /** * Sets the threshold under which binaries are inlined in data segments. * @param binariesInlineThreshold the threshold @@ -585,11 +621,19 @@ boolean getStrictVersionCheck() { boolean getEagerSegmentCaching() { return eagerSegmentCaching; } - + int getBinariesInlineThreshold() { return binariesInlineThreshold; } + @Nullable PersistentCache getPersistentCache() { + return persistentCache; + } + + @Nullable PersistentCachePreloadingConfiguration getPreloadingConfiguration() { + return preloadingConfig; + } + @Override public String toString() { return "FileStoreBuilder{" + @@ -607,6 +651,8 @@ public String toString() { ", memoryMapping=" + memoryMapping + ", offHeapAccess=" + offHeapAccess + ", gcOptions=" + gcOptions + + ", persistentCache=" + persistentCache + + ", persistentCachePreloadingConfiguration=" + preloadingConfig + '}'; } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index 5037f172ffe..a74e1a4f0d6 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -94,6 +93,11 @@ ReadOnlyFileStore bind(@NotNull ReadOnlyRevisions revisions) throws IOException return this; } + @Override + TarFiles getTarFiles() { + return tarFiles; + } + /** * Go to the specified {@code revision} * @@ -120,25 +124,17 @@ public boolean containsSegment(SegmentId id) { @NotNull public Segment readSegment(final SegmentId id) { try { - return segmentCache.getSegment(id, new Callable() { - @Override - public Segment call() throws Exception { - return readSegmentUncached(tarFiles, id); - } - }); + return segmentCache.getSegment(id, () -> readSegmentUncached(id)); } catch (ExecutionException | UncheckedExecutionException e) { throw asSegmentNotFoundException(e, id); } } @Override - public void close() { - Closer closer = Closer.create(); + protected void registerCloseables(Closer closer) { closer.register(tarFiles); closer.register(revisions); - closeAndLogOnFail(closer); - System.gc(); // for any memory-mappings that are no longer used - log.info("TarMK closed: {}", directory); + super.registerCloseables(closer); } @NotNull diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java new file mode 100644 index 00000000000..0a136710fcf --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloader.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.preloader; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.internal.function.Suppliers; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.DelegatingPersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument; + +/** + * A {@link PersistentCache} decorator that preloads segments into the cache by + * asynchronously preloading segments referenced by a segment that is being read + * from the cache. + * + * @see PersistentCachePreloadingConfiguration + */ +public class SegmentPreloader extends DelegatingPersistentCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(SegmentPreloader.class); + + private final Map inProgressPrefetch; + + private final ConcurrentHashMap>> graphCache; + + private final PersistentCache delegate; + + private final ExecutorService dispatchPool; + + private final ExecutorService preloadPool; + + private final int preloadDepth; + + private final Supplier tarFiles; + + /** + * Factory method that decorates the given {@link PersistentCache} with a + * {@link SegmentPreloader} if the given configuration requires preloading. + * Otherwise, the given {@code delegate} is returned as-is. + * + * @param delegate the cache to decorate + * @param config the preloading configuration + * @param tarFiles a supplier of the {@link TarFiles} instance used to read segments and segment graphs + * @return the decorated cache or the given {@code delegate} if no preloading is configured + */ + public static @NotNull PersistentCache decorate(@NotNull PersistentCache delegate, @NotNull PersistentCachePreloadingConfiguration config, @NotNull Supplier tarFiles) { + if (config.getConcurrency() > 0 && config.getMaxPreloadDepth() > 0) { + return new SegmentPreloader(delegate, config, tarFiles); + } + return delegate; + } + + private SegmentPreloader(@NotNull PersistentCache delegate, @NotNull PersistentCachePreloadingConfiguration config, @NotNull Supplier tarFiles) { + this.delegate = delegate; + this.tarFiles = Suppliers.memoize(tarFiles); + this.inProgressPrefetch = new ConcurrentHashMap<>(); + this.graphCache = new ConcurrentHashMap<>(); + this.preloadDepth = config.getMaxPreloadDepth(); + this.dispatchPool = new ThreadPoolExecutor(1,1, + 1, TimeUnit.SECONDS, + new PriorityBlockingQueue<>(), + r -> new Thread(r, "segment-preload-dispatcher")) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + clearInProgressTask(r); + } + }; + int preloadThreads = config.getConcurrency(); + ThreadPoolExecutor preloadPool = new ThreadPoolExecutor(Math.max(1, preloadThreads / 4), preloadThreads, + 5, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(preloadThreads * 4), + r -> { + String threadName = String.format("segment-preload-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); + Thread thread = new Thread(r, threadName); + thread.setUncaughtExceptionHandler((t, e) -> { + if (!(e instanceof InterruptedException)) { + LOG.warn("Uncaught exception in thread {}", t.getName(), e); + } + }); + return thread; + }, + (r, executor) -> { + try { + // force the caller thread to wait for space in the queue (this is always a thread in the dispatchPool) + // this creates back-pressure to the dispatchPool, slowing down the dispatching of new preload tasks + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) { + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + clearInProgressTask(r); + } + }; + preloadPool.allowCoreThreadTimeOut(true); + this.preloadPool = preloadPool; + } + + @Override + protected PersistentCache delegate() { + return delegate; + } + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + dispatch(msb, lsb); + return delegate().readSegment(msb, lsb, loader); + } + + private void dispatch(long msb, long lsb) { + dispatch(msb, lsb, 1); + } + + private void dispatch(long msb, long lsb, int depth) { + execute(dispatchPool, createDispatchTask(msb, lsb, depth)); + } + + @NotNull SegmentPreloader.DispatchTask createDispatchTask(long msb, long lsb, int depth) { + TarFiles tars = tarFiles.get(); + return new DispatchTask(tars, tars::getIndices, msb, lsb, depth); + } + + private void preload(long msb, long lsb, int depth) { + execute(preloadPool, createPreloadTask(msb, lsb, depth)); + } + + @NotNull SegmentPreloader.PreloadTask createPreloadTask(long msb, long lsb, int depth) { + return new PreloadTask(tarFiles.get(), msb, lsb, depth); + } + + private void execute(ExecutorService pool, Runnable r) { + if (!pool.isShutdown() && registerInProgressTask(r)) { + pool.execute(r); + } + } + + private boolean registerInProgressTask(Runnable r) { + return inProgressPrefetch.putIfAbsent(r.hashCode(), Thread.currentThread().getName()) == null; + } + + private void clearInProgressTask(Runnable r) { + inProgressPrefetch.remove(r.hashCode()); + } + + @Override + public void close() { + try { + preloadPool.shutdown(); + dispatchPool.shutdown(); + if (!preloadPool.awaitTermination(4, TimeUnit.SECONDS)) { + preloadPool.shutdownNow(); + } + if (!dispatchPool.awaitTermination(1, TimeUnit.SECONDS)) { + dispatchPool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + preloadPool.shutdownNow(); + dispatchPool.shutdownNow(); + } + } + + class DispatchTask implements Runnable, Comparable { + + private final TarFiles tarFiles; + + private final Supplier>> indicesSupplier; + + private final long msb; + + private final long lsb; + + private final int depth; + + private final long creationTime = System.nanoTime(); + + private DispatchTask(@NotNull TarFiles tarFiles, Supplier>> indicesSupplier, long msb, long lsb, int depth) { + checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); + this.tarFiles = tarFiles; + this.indicesSupplier = indicesSupplier; + this.msb = msb; + this.lsb = lsb; + this.depth = depth; + LOG.debug("Created: {}", this); + } + + @Override + public void run() { + LOG.debug("Running: {}", this); + UUID uuid = new UUID(msb, lsb); + Map> indices = indicesSupplier.get(); + String archiveName = indices.entrySet().stream() + .filter(entry -> entry.getValue().contains(uuid)) + .findFirst() + .map(Map.Entry::getKey) + .orElse(null); + + Map> graph = graphCache.computeIfAbsent(archiveName, name -> { + try { + return tarFiles.getGraph(name); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + for (UUID reference : graph.get(uuid)) { + long refMsb = reference.getMostSignificantBits(); + long refLsb = reference.getLeastSignificantBits(); + if (!delegate.containsSegment(refMsb, refLsb)) { + preload(refMsb, refLsb, depth); + } else if (depth < preloadDepth && SegmentId.isDataSegmentId(refLsb)) { + dispatch(refMsb, refLsb, depth + 1); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o.getClass() != DispatchTask.class) { + return false; + } + DispatchTask that = (DispatchTask) o; + return msb == that.msb && lsb == that.lsb && depth == that.depth; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), msb, lsb, depth); + } + + @Override + public String toString() { + return "DispatchTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + } + + private int getPreloadDepth() { + return depth; + } + + private long getCreationTime() { + return creationTime; + } + + @Override + public int compareTo(@NotNull SegmentPreloader.DispatchTask o) { + return Comparator + .comparing(DispatchTask::getPreloadDepth) + .thenComparing(DispatchTask::getCreationTime) + .compare(this, o); + } + } + + class PreloadTask implements Runnable { + + private final TarFiles tarFiles; + + private final long msb; + + private final long lsb; + + private final int depth; + + private PreloadTask(TarFiles tarFiles, long msb, long lsb, int depth) { + checkArgument(depth <= preloadDepth, "depth must be <= %d, is %d", preloadDepth, depth); + this.tarFiles = tarFiles; + this.msb = msb; + this.lsb = lsb; + this.depth = depth; + LOG.debug("Created: {}", this); + } + + @Override + public void run() { + LOG.debug("Running: {}", this); + if (depth < preloadDepth && SegmentId.isDataSegmentId(lsb)) { + dispatch(msb, lsb, depth + 1); + } + if (!delegate.containsSegment(msb, lsb)) { + Buffer segmentBuffer = tarFiles.readSegment(msb, lsb); + if (segmentBuffer != null) { + delegate.writeSegment(msb, lsb, segmentBuffer); + } + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o.getClass() != PreloadTask.class) { + return false; + } + PreloadTask that = (PreloadTask) o; + return msb == that.msb && lsb == that.lsb; + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), msb, lsb); + } + + @Override + public String toString() { + return "PreloadTask{segmentId=" + new UUID(msb, lsb) + ", depth=" + depth + '}'; + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java new file mode 100644 index 00000000000..4895e4baa9b --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/DelegatingPersistentCache.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.Callable; + +/** + * Simple abstract implementation of a delegating PersistentCache that can be + * used as a base class for {@link PersistentCache} decorators. + */ +public abstract class DelegatingPersistentCache implements PersistentCache { + + protected abstract PersistentCache delegate(); + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + return delegate().readSegment(msb, lsb, loader); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return delegate().containsSegment(msb, lsb); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + delegate().writeSegment(msb, lsb, buffer); + } + + @Override + public void cleanUp() { + delegate().cleanUp(); + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java new file mode 100644 index 00000000000..c8477da720c --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCachePreloadingConfiguration.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import java.util.concurrent.Callable; + +/** + * Configuration for a segment preload mechanism that preloads segments into a + * {@link PersistentCache}. The preload mechanism is triggered whenever a segment + * in the cache is {@link PersistentCache#readSegment(long, long, Callable)|accessed}. + * When this happens, all segments referenced by the accessed segment are asynchronously + * preloaded. + *

+ * Next to the concurrency level, i.e. how many threads are used for preloading, the + * {@code maxPreloadDepth} (default: {@code 1}, which controls how many recursive levels + * of referenced segments are preloaded, can be configured. + *

+ * Prefetching is done asynchronously, but it may add some overhead. It is primarily + * recommended to parallelize slow I/O, e.g. when using a remote persistence. + *

+ * Different scenarios may warrant different preloading strategies. A short-lived + * process traversing a repository (e.g. copy, offline-compaction) with an initially + * empty cache may benefit from a more threads and a higher preload-depth, while a + * long-running process, e.g. a web application, may perform better with fewer threads + * and a lower preload depth. + */ +public class PersistentCachePreloadingConfiguration { + + private final int concurrency; + + private int maxPreloadDepth; + + private PersistentCachePreloadingConfiguration(int concurrency, int preloadDepth) { + this.concurrency = concurrency; + this.maxPreloadDepth = preloadDepth; + } + + /** + * Creates a new {@link PersistentCachePreloadingConfiguration} with the given concurrency + * level and a {@code preloadDepth} of {@code 1}. + * + * @param concurrency number of threads to use for preloading + * @return a new configuration + */ + public static PersistentCachePreloadingConfiguration withConcurrency(int concurrency) { + return new PersistentCachePreloadingConfiguration(concurrency, 1); + } + + /** + * Set how many recursive levels of referenced segments should be preloaded. + * + * @param maxPreloadDepth depth of the preloading, i.e. how many levels of referenced + * segments should be preloaded (default: {@code 1}) + * @return this configuration + */ + public PersistentCachePreloadingConfiguration withMaxPreloadDepth(int maxPreloadDepth) { + this.maxPreloadDepth = maxPreloadDepth; + return this; + } + + public int getConcurrency() { + return concurrency; + } + + public int getMaxPreloadDepth() { + return maxPreloadDepth; + } + + @Override + public String toString() { + return "PersistentCachePreloadingConfiguration{" + + "concurrency=" + concurrency + + ", maxPreloadDepth=" + maxPreloadDepth + + '}'; + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java index db4c23c9902..1573faf0346 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java @@ -15,7 +15,7 @@ * limitations under the License. */ @Internal(since = "1.0.0") -@Version("5.0.0") +@Version("5.1.0") package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.annotations.Internal; diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java new file mode 100644 index 00000000000..45c597c0d6b --- /dev/null +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/preloader/SegmentPreloaderTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.file.preloader; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader.DispatchTask; +import org.apache.jackrabbit.oak.segment.file.preloader.SegmentPreloader.PreloadTask; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCachePreloadingConfiguration; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.awaitility.Awaitility; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class SegmentPreloaderTest { + + private static final Logger LOG = LoggerFactory.getLogger(SegmentPreloaderTest.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + @Test + public void testDecorationSkippedForWrongArguments() { + Supplier tarFiles = () -> null; // never called + PersistentCache delegate = new MemoryTestCache(); + PersistentCache decorated = SegmentPreloader.decorate(delegate, PersistentCachePreloadingConfiguration.withConcurrency(0), tarFiles); + assertSame(delegate, decorated); + } + + @Test + public void viaFileStoreBuilder() throws InvalidFileStoreVersionException, IOException, CommitFailedException { + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .build()) { + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = nodeStore.getRoot().builder(); + + generateContent(builder, 4, 4); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + MemoryTestCache persistentCache = new MemoryTestCache(); + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .withPersistentCache(persistentCache) + .withPersistentCachePreloading(PersistentCachePreloadingConfiguration.withConcurrency(4).withMaxPreloadDepth(1)) + .build()) { + SegmentId root = fileStore.getRevisions().getPersistedHead().getSegmentId(); + Segment segment = root.getSegment(); + + int expectedCacheSize = 1 + segment.getReferencedSegmentIdCount(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(expectedCacheSize, persistentCache.segments.size())); + + assertTrue(persistentCache.containsSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); + } + } + + @Test + public void testPreloading() throws IOException, InvalidFileStoreVersionException, CommitFailedException { + SegmentNodeStorePersistence persistence = new TarPersistence(folder.getRoot()); + try (FileStore fileStore = FileStoreBuilder.fileStoreBuilder(folder.getRoot()) + .withMaxFileSize(4) + .withMemoryMapping(false) + .withCustomPersistence(persistence) + .build()) { + SegmentNodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = nodeStore.getRoot().builder(); + + generateContent(builder, 4, 8); + nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + MemoryTestCache underlyingCache = new MemoryTestCache(); + + try (TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), persistence); + SegmentPreloader preloadingCache = (SegmentPreloader)SegmentPreloader.decorate(underlyingCache, + PersistentCachePreloadingConfiguration.withConcurrency(8).withMaxPreloadDepth(2), () -> tarFiles); + JournalFileReader journalFileReader = persistence.getJournalFile().openJournalReader()) { + + UUID root = getRootUUID(journalFileReader); + + assertTrue(tarFiles.getIndices().size() > 2); + Map> graph = computeFullGraph(tarFiles); + + Set referencedSegments = collectReferencedSegments(root, graph, 1); + for (UUID segment : referencedSegments) { + assertFalse(underlyingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + assertFalse(preloadingCache.containsSegment(segment.getMostSignificantBits(), segment.getLeastSignificantBits())); + } + + preloadingCache.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), + () -> tarFiles.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits())); + assertReferencedSegmentsLoaded(referencedSegments, underlyingCache, preloadingCache); + + UUID nextToLoad = null; + Set uuids = null; + for (UUID referencedSegment : referencedSegments) { + uuids = collectReferencedSegments(referencedSegment, graph, 2); + uuids.removeIf(uuid -> underlyingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + if (!uuids.isEmpty()) { + nextToLoad = referencedSegment; + } + } + + assertNotNull(nextToLoad); + + final UUID next = nextToLoad; + preloadingCache.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits(), + () -> tarFiles.readSegment(next.getMostSignificantBits(), next.getLeastSignificantBits())); + LOG.info("Next loaded segment: {}", next); + assertReferencedSegmentsLoaded(uuids, underlyingCache, preloadingCache); + } + } + + @Test + public void testDispatchTaskEquals() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + DispatchTask task1 = preloader.createDispatchTask(msb, lsb, 1); + assertEquals(task1, task1); + + DispatchTask task2 = preloader.createDispatchTask(msb, lsb, 1); + assertEquals(task1, task2); + + DispatchTask task3 = preloader.createDispatchTask(msb, lsb, 0); + assertNotEquals(task1, task3); + + DispatchTask task4 = preloader.createDispatchTask(msb, lsb + 1, 1); + assertNotEquals(task1, task4); + + DispatchTask task5 = preloader.createDispatchTask(msb + 1, lsb, 1); + assertNotEquals(task1, task5); + + assertNotEquals(task1, new Object()); + }); + } + + @Test + public void testDispatchTaskArgumentValidation() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertThrows(IllegalArgumentException.class, () -> preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 3)); + }); + } + + @Test + public void testDispatchTaskToString() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertEquals( + "DispatchTask{segmentId=" + uuid + ", depth=1}", + preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); + }); + } + + @Test + public void testDispatchTaskCompareTo() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + DispatchTask task1 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + DispatchTask task2 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1); + DispatchTask task3 = preloader.createDispatchTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 2); + List tasks = new ArrayList<>(); + tasks.add(task1); + tasks.add(task2); + tasks.add(task3); + Collections.sort(tasks); + assertEquals(List.of(task2, task3, task1), tasks); + }); + } + + @Test + public void testPreloadTaskEquals() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + + PreloadTask task1 = preloader.createPreloadTask(msb, lsb, 1); + assertEquals(task1, task1); + + PreloadTask task2 = preloader.createPreloadTask(msb, lsb, 1); + assertEquals(task1, task2); + + PreloadTask task3 = preloader.createPreloadTask(msb, lsb, 0); + assertEquals(task1, task3); // depth is not considered for equality + + PreloadTask task4 = preloader.createPreloadTask(msb, lsb + 1, 1); + assertNotEquals(task1, task4); + + PreloadTask task5 = preloader.createPreloadTask(msb + 1, lsb, 1); + assertNotEquals(task1, task5); + + assertNotEquals(task1, new Object()); + + }); + } + + @Test + public void testPreloadTaskToString() throws IOException { + withSegmentPreloader(preloader -> { + UUID uuid = UUID.randomUUID(); + assertEquals("PreloadTask{segmentId=" + uuid + ", depth=1}", + preloader.createPreloadTask(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(), 1).toString()); + }); + } + + private void withSegmentPreloader(Consumer withPreloader) throws IOException { + MemoryTestCache cache = new MemoryTestCache(); + PersistentCachePreloadingConfiguration config = + PersistentCachePreloadingConfiguration.withConcurrency(2).withMaxPreloadDepth(2); + try (TarFiles tarFiles = createReadOnlyTarFiles(folder.getRoot(), new TarPersistence(folder.getRoot())); + SegmentPreloader preloader = (SegmentPreloader) SegmentPreloader.decorate(cache, config, () -> tarFiles)) { + withPreloader.accept(preloader); + } + } + + private static @NotNull UUID getRootUUID(JournalFileReader journalFileReader) throws IOException { + String line = journalFileReader.readLine(); + String[] parts = line.split(":"); + return UUID.fromString(parts[0]); + } + + private void assertReferencedSegmentsLoaded(Set referencedSegments, MemoryTestCache underlyingCache, SegmentPreloader preloadingCache) { + Set segments = new HashSet<>(referencedSegments); + int timeoutSec = 10; + Awaitility + .await() + .atMost(timeoutSec, TimeUnit.SECONDS) + .untilAsserted(() -> { + segments.removeIf(uuid -> + underlyingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()) + && preloadingCache.containsSegment(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); + assertEquals("Not all referenced segments have been preloaded within " + timeoutSec + " seconds", + Set.of(), segments); + }); + + + } + + private static Map> computeFullGraph(TarFiles tarFiles) throws IOException { + Map> fullGraph = new HashMap<>(); + for (String archiveName : tarFiles.getIndices().keySet()) { + Map> graph = tarFiles.getGraph(archiveName); + fullGraph.putAll(graph); + } + return fullGraph; + } + + private TarFiles createReadOnlyTarFiles(File directory, SegmentNodeStorePersistence persistence) throws IOException { + return TarFiles.builder() + .withDirectory(directory) + .withPersistence(persistence) + .withReadOnly() + .withIOMonitor(new IOMonitorAdapter()) + .withRemoteStoreMonitor(new RemoteStoreMonitorAdapter()) + .withTarRecovery((uuid, data, entryRecovery) -> { + throw new UnsupportedOperationException(); + }) + .build(); + } + + private static Set collectReferencedSegments(UUID root, Map> graph, int depth) { + Set uuids = new LinkedHashSet<>(); + uuids.add(root); + if (depth > 0) { + for (UUID edge : graph.get(root)) { + uuids.addAll(collectReferencedSegments(edge, graph, depth - 1)); + } + } + return uuids; + } + + private void generateContent(NodeBuilder builder, int childNodes, int depth) { + RandomUtils r = RandomUtils.insecure(); + RandomStringUtils random = RandomStringUtils.insecure(); + for (int i = 0; i < childNodes; i++) { + NodeBuilder child = builder.child(random.nextAlphabetic(40, 60)); + child.setProperty("jcr:primaryType", random.nextAlphabetic(4)); + child.setProperty(random.nextAlphabetic(30, 40), r.randomBoolean() ? random.nextAlphabetic(100, 150) : r.randomLong()); + if (depth > 1) { + generateContent(child, childNodes, depth - 1); + } + } + } + + private static class MemoryTestCache implements PersistentCache { + + Map segments = new ConcurrentHashMap<>(); + + @Override + public @Nullable Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + return segments.computeIfAbsent(lsb, i -> { + try { + return loader.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return segments.containsKey(lsb); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + segments.put(lsb, buffer); + } + + @Override + public void cleanUp() { + segments.clear(); + } + } + +}