Skip to content

Commit 3cb10d3

Browse files
committed
OAK-11934 - segment prefetching for segmentstore cache
1 parent bca1a68 commit 3cb10d3

File tree

6 files changed

+264
-17
lines changed

6 files changed

+264
-17
lines changed

oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java

Lines changed: 194 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,20 @@
2020
package org.apache.jackrabbit.oak.segment;
2121

2222
import static java.util.Objects.requireNonNull;
23+
import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
2324
import static org.apache.jackrabbit.oak.segment.CacheWeights.segmentWeight;
2425

26+
import java.io.Closeable;
27+
import java.io.IOException;
28+
import java.util.UUID;
2529
import java.util.concurrent.Callable;
2630
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
2734
import java.util.concurrent.atomic.AtomicInteger;
2835
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Function;
2937
import java.util.function.Supplier;
3038

3139
import org.apache.jackrabbit.guava.common.cache.Cache;
@@ -35,6 +43,9 @@
3543
import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
3644
import org.apache.jackrabbit.oak.segment.CacheWeights.SegmentCacheWeigher;
3745
import org.jetbrains.annotations.NotNull;
46+
import org.jetbrains.annotations.Nullable;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
3849

3950
/**
4051
* A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment}
@@ -46,7 +57,7 @@
4657
* SegmentId#segment}. Every time an segment is evicted from this cache the
4758
* memoised segment is discarded (see {@code SegmentId#onAccess}.
4859
*/
49-
public abstract class SegmentCache {
60+
public abstract class SegmentCache implements Closeable {
5061

5162
/**
5263
* Default maximum weight of this cache in MB
@@ -59,17 +70,71 @@ public abstract class SegmentCache {
5970
* Create a new segment cache of the given size. Returns an always empty
6071
* cache for {@code cacheSizeMB <= 0}.
6172
*
62-
* @param cacheSizeMB size of the cache in megabytes.
73+
* @param cacheSizeMB size of the cache in megabytes.
6374
*/
6475
@NotNull
65-
public static SegmentCache newSegmentCache(long cacheSizeMB) {
66-
if (cacheSizeMB > 0) {
76+
public static SegmentCache newSegmentCache(int cacheSizeMB) {
77+
return new SegmentCache.Config()
78+
.withCacheSizeMB(cacheSizeMB)
79+
.build(null, null);
80+
}
81+
82+
private static SegmentCache newSegmentCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @Nullable Function<UUID, SegmentId> uuidToSegmentId, @Nullable Function<SegmentId, Segment> segmentLoader) {
83+
if (cacheSizeMB > 0 && prefetchThreads > 0 && prefetchDepth > 0 && uuidToSegmentId != null && segmentLoader != null) {
84+
return new PrefetchCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader);
85+
} else if (cacheSizeMB > 0) {
6786
return new NonEmptyCache(cacheSizeMB);
6887
} else {
6988
return new EmptyCache();
7089
}
7190
}
7291

92+
public static final class Config {
93+
94+
private int cacheSizeMB = DEFAULT_SEGMENT_CACHE_MB;
95+
96+
private int prefetchThreads = 0;
97+
98+
private int prefetchDepth = 1;
99+
100+
public Config withCacheSizeMB(int cacheSizeMB) {
101+
this.cacheSizeMB = cacheSizeMB;
102+
return this;
103+
}
104+
105+
public Config withPrefetchThreads(int prefetchThreads) {
106+
this.prefetchThreads = prefetchThreads;
107+
return this;
108+
}
109+
110+
public Config withPrefetchDepth(int prefetchDepth) {
111+
this.prefetchDepth = prefetchDepth;
112+
return this;
113+
}
114+
115+
@NotNull
116+
public SegmentCache build(@Nullable Function<UUID, SegmentId> uuidToSegmentId, @Nullable Function<SegmentId, Segment> segmentLoader) {
117+
if (prefetchThreads > 0 && prefetchDepth > 0) {
118+
checkArgument(uuidToSegmentId != null, "uuidToSegmentId must be provided when prefetching is enabled");
119+
checkArgument(segmentLoader != null, "segmentLoader must be provided when prefetching is enabled");
120+
}
121+
return SegmentCache.newSegmentCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader);
122+
}
123+
124+
public int getSegmentCacheSize() {
125+
return cacheSizeMB;
126+
}
127+
128+
@Override
129+
public String toString() {
130+
return "Config{" +
131+
"cacheSizeMB=" + cacheSizeMB +
132+
", prefetchThreads=" + prefetchThreads +
133+
", prefetchDepth=" + prefetchDepth +
134+
'}';
135+
}
136+
}
137+
73138
/**
74139
* Retrieve an segment from the cache or load it and cache it if not yet in
75140
* the cache.
@@ -109,6 +174,11 @@ public abstract Segment getSegment(@NotNull SegmentId id, @NotNull Callable<Segm
109174
*/
110175
public abstract void recordHit();
111176

177+
@Override
178+
public void close() {
179+
// nothing to do
180+
}
181+
112182
private static class NonEmptyCache extends SegmentCache {
113183

114184
/**
@@ -129,8 +199,8 @@ private static class NonEmptyCache extends SegmentCache {
129199
*
130200
* @param cacheSizeMB size of the cache in megabytes.
131201
*/
132-
private NonEmptyCache(long cacheSizeMB) {
133-
long maximumWeight = cacheSizeMB * 1024 * 1024;
202+
private NonEmptyCache(int cacheSizeMB) {
203+
long maximumWeight = cacheSizeMB * 1024 * 1024L;
134204
this.cache = CacheBuilder.newBuilder()
135205
.concurrencyLevel(16)
136206
.maximumWeight(maximumWeight)
@@ -215,6 +285,124 @@ public void recordHit() {
215285
}
216286
}
217287

288+
289+
private static class PrefetchCache extends NonEmptyCache {
290+
291+
private static final Logger LOG = LoggerFactory.getLogger(PrefetchCache.class);
292+
293+
private final ThreadPoolExecutor prefetchPool;
294+
295+
private final int prefetchDepth;
296+
297+
private final Function<UUID, SegmentId> uuidToSegmentId;
298+
299+
private final Function<SegmentId, Segment> segmentLoader;
300+
301+
/**
302+
* Create a new cache of the given size.
303+
*
304+
* @param cacheSizeMB size of the cache in megabytes.
305+
* @param prefetchThreads the number of threads to use for prefetching
306+
* @param prefetchDepth the depth to prefetch
307+
* @param uuidToSegmentId the function to get the SegmentId by its UUID
308+
*/
309+
private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function<UUID, SegmentId> uuidToSegmentId, @NotNull Function<SegmentId, Segment> segmentLoader) {
310+
super(cacheSizeMB);
311+
this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads,
312+
30, TimeUnit.SECONDS,
313+
new LinkedBlockingQueue<>(prefetchThreads * 8),
314+
r -> {
315+
String threadName = String.format("segment-prefetch-%s",
316+
Long.toHexString(System.nanoTime() & 0xFFFFF));
317+
return new Thread(r, threadName) {
318+
{
319+
setUncaughtExceptionHandler((t, e) -> {
320+
if (!(e instanceof InterruptedException)) {
321+
LOG.warn("Uncaught exception in thread {}", t.getName(), e);
322+
}
323+
});
324+
}
325+
};
326+
},
327+
new ThreadPoolExecutor.DiscardPolicy());
328+
this.prefetchPool.allowCoreThreadTimeOut(true);
329+
this.prefetchDepth = prefetchDepth;
330+
this.segmentLoader = segmentLoader;
331+
this.uuidToSegmentId = uuidToSegmentId;
332+
}
333+
334+
@Override
335+
public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable<Segment> loader) throws ExecutionException {
336+
return super.getSegment(id, () -> {
337+
Segment s = loader.call();
338+
if (s != null && id.isDataSegmentId()) {
339+
prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth));
340+
}
341+
return s;
342+
});
343+
}
344+
345+
@Override
346+
public void close() {
347+
super.close();
348+
try {
349+
prefetchPool.shutdown();
350+
if (!prefetchPool.awaitTermination(10, TimeUnit.SECONDS)) {
351+
prefetchPool.shutdownNow();
352+
}
353+
} catch (InterruptedException e) {
354+
Thread.currentThread().interrupt();
355+
prefetchPool.shutdownNow();
356+
}
357+
}
358+
359+
private class PrefetchRunnable implements Runnable {
360+
private final Segment segment;
361+
private final int depth;
362+
363+
PrefetchRunnable(Segment segment, int depth) {
364+
this.segment = segment;
365+
this.depth = depth;
366+
}
367+
368+
@Override
369+
public void run() {
370+
SegmentId segmentId = segment.getSegmentId();
371+
if (SegmentId.isDataSegmentId(segmentId.getLeastSignificantBits())) {
372+
for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
373+
UUID referencedSegmentId = segment.getReferencedSegmentId(i);
374+
SegmentId id = uuidToSegmentId.apply(referencedSegmentId);
375+
prefetchPool.execute(() -> {
376+
try {
377+
PrefetchCache.super.getSegment(id, () -> {
378+
Segment s = null;
379+
try {
380+
LOG.debug("Prefetching segment {}, referenced by segment {}", id, segmentId);
381+
return s = segmentLoader.apply(id);
382+
} catch (SegmentNotFoundException e) {
383+
LOG.warn("SegmentNotFoundException during prefetch of segment {}, referenced by segment {}", id, segmentId);
384+
throw e;
385+
} finally {
386+
if (s != null && depth > 0) {
387+
prefetchPool.execute(new PrefetchRunnable(s, depth - 1));
388+
}
389+
}
390+
});
391+
} catch (ExecutionException e) {
392+
LOG.warn("Error prefetching segment {}", id, e);
393+
}
394+
});
395+
}
396+
}
397+
}
398+
399+
@Override
400+
public String toString() {
401+
return "PrefetchRunnable{segment=" + segment.getSegmentId() + '}';
402+
}
403+
}
404+
}
405+
218406
/** An always empty cache */
219407
private static class EmptyCache extends SegmentCache {
220408
private final Stats stats = new Stats(NAME, 0, () -> 0L);

oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.jackrabbit.oak.segment.file;
2020

21-
import static org.apache.jackrabbit.oak.segment.SegmentCache.newSegmentCache;
2221
import static org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData;
2322

2423
import java.io.Closeable;
@@ -130,7 +129,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th
130129
protected final IOMonitor ioMonitor;
131130

132131
protected final RemoteStoreMonitor remoteStoreMonitor;
133-
132+
134133
protected final int binariesInlineThreshold;
135134

136135
AbstractFileStore(final FileStoreBuilder builder) {
@@ -142,7 +141,10 @@ public SegmentId newSegmentId(long msb, long lsb) {
142141
}
143142
});
144143
this.blobStore = builder.getBlobStore();
145-
this.segmentCache = newSegmentCache(builder.getSegmentCacheSize());
144+
this.segmentCache = builder.getSegmentCacheConfig().build(
145+
uuid -> tracker.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()),
146+
segmentId -> readSegmentUncached(getTarFiles(), segmentId)
147+
);
146148
this.segmentReader = new CachingSegmentReader(
147149
this::getWriter,
148150
blobStore,

oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ public void close() {
487487
closer.register(repositoryLock::unlock);
488488
closer.register(tarFiles) ;
489489
closer.register(revisions);
490+
closer.register(segmentCache);
490491

491492
closeAndLogOnFail(closer);
492493
}

oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333
import java.io.IOException;
3434
import java.util.HashSet;
3535
import java.util.Set;
36+
import java.util.function.Consumer;
3637

3738
import org.apache.jackrabbit.oak.commons.conditions.Validate;
3839
import org.apache.jackrabbit.oak.segment.CacheWeights.NodeCacheWeigher;
3940
import org.apache.jackrabbit.oak.segment.CacheWeights.StringCacheWeigher;
4041
import org.apache.jackrabbit.oak.segment.CacheWeights.TemplateCacheWeigher;
4142
import org.apache.jackrabbit.oak.segment.RecordCache;
4243
import org.apache.jackrabbit.oak.segment.Segment;
44+
import org.apache.jackrabbit.oak.segment.SegmentCache;
4345
import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
4446
import org.apache.jackrabbit.oak.segment.WriterCacheManager;
4547
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
@@ -79,7 +81,8 @@ public class FileStoreBuilder {
7981

8082
private int maxFileSize = DEFAULT_MAX_FILE_SIZE;
8183

82-
private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
84+
private final SegmentCache.Config segmentCacheConfig = new SegmentCache.Config()
85+
.withCacheSizeMB(DEFAULT_SEGMENT_CACHE_MB);
8386

8487
private int stringCacheSize = DEFAULT_STRING_CACHE_MB;
8588

@@ -94,7 +97,7 @@ public class FileStoreBuilder {
9497
private boolean memoryMapping = MEMORY_MAPPING_DEFAULT;
9598

9699
private boolean offHeapAccess = getBoolean("access.off.heap");
97-
100+
98101
private int binariesInlineThreshold = Segment.MEDIUM_LIMIT;
99102

100103
private SegmentNodeStorePersistence persistence;
@@ -192,7 +195,18 @@ public FileStoreBuilder withMaxFileSize(int maxFileSize) {
192195
*/
193196
@NotNull
194197
public FileStoreBuilder withSegmentCacheSize(int segmentCacheSize) {
195-
this.segmentCacheSize = segmentCacheSize;
198+
this.segmentCacheConfig.withCacheSizeMB(segmentCacheSize);
199+
return this;
200+
}
201+
/**
202+
* Configure the segment cache.
203+
*
204+
* @param segmentCacheConfigurer Callback to configure segment cache
205+
* @return this instance
206+
*/
207+
@NotNull
208+
public FileStoreBuilder withSegmentCache(Consumer<SegmentCache.Config> segmentCacheConfigurer) {
209+
segmentCacheConfigurer.accept(this.segmentCacheConfig);
196210
return this;
197211
}
198212

@@ -397,7 +411,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) {
397411
this.eagerSegmentCaching = eagerSegmentCaching;
398412
return this;
399413
}
400-
414+
401415
/**
402416
* Sets the threshold under which binaries are inlined in data segments.
403417
* @param binariesInlineThreshold the threshold
@@ -505,8 +519,9 @@ public int getMaxFileSize() {
505519
return maxFileSize;
506520
}
507521

508-
int getSegmentCacheSize() {
509-
return segmentCacheSize;
522+
@NotNull
523+
SegmentCache.Config getSegmentCacheConfig() {
524+
return segmentCacheConfig;
510525
}
511526

512527
int getStringCacheSize() {
@@ -585,7 +600,7 @@ boolean getStrictVersionCheck() {
585600
boolean getEagerSegmentCaching() {
586601
return eagerSegmentCaching;
587602
}
588-
603+
589604
int getBinariesInlineThreshold() {
590605
return binariesInlineThreshold;
591606
}
@@ -598,7 +613,7 @@ public String toString() {
598613
", blobStore=" + blobStore +
599614
", binariesInlineThreshold=" + binariesInlineThreshold +
600615
", maxFileSize=" + maxFileSize +
601-
", segmentCacheSize=" + segmentCacheSize +
616+
", segmentCacheConfig=" + segmentCacheConfig +
602617
", stringCacheSize=" + stringCacheSize +
603618
", templateCacheSize=" + templateCacheSize +
604619
", stringDeduplicationCacheSize=" + stringDeduplicationCacheSize +

0 commit comments

Comments
 (0)