2121import org .apache .jackrabbit .oak .commons .Buffer ;
2222import org .apache .jackrabbit .oak .commons .internal .function .Suppliers ;
2323import org .apache .jackrabbit .oak .segment .SegmentId ;
24- import org .apache .jackrabbit .oak .segment .SegmentNotFoundException ;
2524import org .apache .jackrabbit .oak .segment .file .tar .TarFiles ;
2625import org .apache .jackrabbit .oak .segment .spi .persistence .persistentcache .DelegatingPersistentCache ;
2726import org .apache .jackrabbit .oak .segment .spi .persistence .persistentcache .PersistentCache ;
2827import org .apache .jackrabbit .oak .segment .spi .persistence .persistentcache .PersistentCachePreloadingConfiguration ;
2928import org .jetbrains .annotations .NotNull ;
3029import org .jetbrains .annotations .Nullable ;
31- import org .jetbrains .annotations .VisibleForTesting ;
3230import org .slf4j .Logger ;
3331import org .slf4j .LoggerFactory ;
3432
5351
5452/**
5553 * A {@link PersistentCache} decorator that preloads segments into the cache by
56- * asynchronously prefetching segments referenced by a segment that is being read
54+ * asynchronously preloading segments referenced by a segment that is being read
5755 * from the cache.
5856 *
5957 * @see PersistentCachePreloadingConfiguration
@@ -70,9 +68,9 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close
7068
7169 private final ExecutorService dispatchPool ;
7270
73- private final ExecutorService prefetchPool ;
71+ private final ExecutorService preloadPool ;
7472
75- private final int prefetchDepth ;
73+ private final int preloadDepth ;
7674
7775 private final Supplier <TarFiles > tarFiles ;
7876
@@ -87,7 +85,7 @@ public class SegmentPreloader extends DelegatingPersistentCache implements Close
8785 * @return the decorated cache or the given {@code delegate} if no preloading is configured
8886 */
8987 public static @ NotNull PersistentCache decorate (@ NotNull PersistentCache delegate , @ NotNull PersistentCachePreloadingConfiguration config , @ NotNull Supplier <TarFiles > tarFiles ) {
90- if (config .getConcurrency () > 0 && config .getPrefetchDepth () > 0 ) {
88+ if (config .getConcurrency () > 0 && config .getMaxPreloadDepth () > 0 ) {
9189 return new SegmentPreloader (delegate , config , tarFiles );
9290 }
9391 return delegate ;
@@ -98,23 +96,23 @@ private SegmentPreloader(@NotNull PersistentCache delegate, @NotNull PersistentC
9896 this .tarFiles = Suppliers .memoize (tarFiles );
9997 this .inProgressPrefetch = new ConcurrentHashMap <>();
10098 this .graphCache = new ConcurrentHashMap <>();
101- this .prefetchDepth = config .getPrefetchDepth ();
99+ this .preloadDepth = config .getMaxPreloadDepth ();
102100 this .dispatchPool = new ThreadPoolExecutor (1 ,1 ,
103101 1 , TimeUnit .SECONDS ,
104102 new PriorityBlockingQueue <>(),
105- r -> new Thread (r , "segment-prefetch -dispatcher" )) {
103+ r -> new Thread (r , "segment-preload -dispatcher" )) {
106104 @ Override
107105 protected void afterExecute (Runnable r , Throwable t ) {
108106 super .afterExecute (r , t );
109107 clearInProgressTask (r );
110108 }
111109 };
112- int prefetchThreads = config .getConcurrency ();
113- this . prefetchPool = new ThreadPoolExecutor (Math .max (1 , prefetchThreads / 4 ), prefetchThreads ,
114- 10 , TimeUnit .SECONDS ,
115- new LinkedBlockingQueue <>(prefetchThreads * 4 ),
110+ int preloadThreads = config .getConcurrency ();
111+ ThreadPoolExecutor preloadPool = new ThreadPoolExecutor (Math .max (1 , preloadThreads / 4 ), preloadThreads ,
112+ 5 , TimeUnit .SECONDS ,
113+ new LinkedBlockingQueue <>(preloadThreads * 4 ),
116114 r -> {
117- String threadName = String .format ("segment-prefetch -%s" , Long .toHexString (System .nanoTime () & 0xFFFFF ));
115+ String threadName = String .format ("segment-preload -%s" , Long .toHexString (System .nanoTime () & 0xFFFFF ));
118116 Thread thread = new Thread (r , threadName );
119117 thread .setUncaughtExceptionHandler ((t , e ) -> {
120118 if (!(e instanceof InterruptedException )) {
@@ -126,7 +124,7 @@ protected void afterExecute(Runnable r, Throwable t) {
126124 (r , executor ) -> {
127125 try {
128126 // force the caller thread to wait for space in the queue (this is always a thread in the dispatchPool)
129- // this creates back-pressure to the dispatchPool, slowing down the dispatching of new prefetch tasks
127+ // this creates back-pressure to the dispatchPool, slowing down the dispatching of new preload tasks
130128 executor .getQueue ().put (r );
131129 } catch (InterruptedException e ) {
132130 Thread .currentThread ().interrupt ();
@@ -139,6 +137,8 @@ protected void afterExecute(Runnable r, Throwable t) {
139137 clearInProgressTask (r );
140138 }
141139 };
140+ preloadPool .allowCoreThreadTimeOut (true );
141+ this .preloadPool = preloadPool ;
142142 }
143143
144144 @ Override
@@ -148,16 +148,29 @@ protected PersistentCache delegate() {
148148
149149 @ Override
150150 public @ Nullable Buffer readSegment (long msb , long lsb , @ NotNull Callable <Buffer > loader ) {
151- dispatch (tarFiles . get (), msb , lsb );
151+ dispatch (msb , lsb );
152152 return delegate ().readSegment (msb , lsb , loader );
153153 }
154154
155- private void dispatch (@ NotNull TarFiles tarFiles , long msb , long lsb ) {
156- dispatch (tarFiles , tarFiles :: getIndices , msb , lsb , 0 );
155+ private void dispatch (long msb , long lsb ) {
156+ dispatch (msb , lsb , 1 );
157157 }
158158
159- private void dispatch (@ NotNull TarFiles tarFiles , Supplier <Map <String , Set <UUID >>> indicesSupplier , long msb , long lsb , int depth ) {
160- execute (dispatchPool , new PrefetchDispatchTask (tarFiles , indicesSupplier , msb , lsb , depth ));
159+ private void dispatch (long msb , long lsb , int depth ) {
160+ execute (dispatchPool , createDispatchTask (msb , lsb , depth ));
161+ }
162+
163+ @ NotNull SegmentPreloader .DispatchTask createDispatchTask (long msb , long lsb , int depth ) {
164+ TarFiles tars = tarFiles .get ();
165+ return new DispatchTask (tars , tars ::getIndices , msb , lsb , depth );
166+ }
167+
168+ private void preload (long msb , long lsb , int depth ) {
169+ execute (preloadPool , createPreloadTask (msb , lsb , depth ));
170+ }
171+
172+ @ NotNull SegmentPreloader .PreloadTask createPreloadTask (long msb , long lsb , int depth ) {
173+ return new PreloadTask (tarFiles .get (), msb , lsb , depth );
161174 }
162175
163176 private void execute (ExecutorService pool , Runnable r ) {
@@ -177,22 +190,22 @@ private void clearInProgressTask(Runnable r) {
177190 @ Override
178191 public void close () {
179192 try {
180- prefetchPool .shutdown ();
193+ preloadPool .shutdown ();
181194 dispatchPool .shutdown ();
182- if (!prefetchPool .awaitTermination (4 , TimeUnit .SECONDS )) {
183- prefetchPool .shutdownNow ();
195+ if (!preloadPool .awaitTermination (4 , TimeUnit .SECONDS )) {
196+ preloadPool .shutdownNow ();
184197 }
185198 if (!dispatchPool .awaitTermination (1 , TimeUnit .SECONDS )) {
186199 dispatchPool .shutdownNow ();
187200 }
188201 } catch (InterruptedException e ) {
189202 Thread .currentThread ().interrupt ();
190- prefetchPool .shutdownNow ();
203+ preloadPool .shutdownNow ();
191204 dispatchPool .shutdownNow ();
192205 }
193206 }
194207
195- private class PrefetchDispatchTask implements Runnable , Comparable <PrefetchDispatchTask > {
208+ class DispatchTask implements Runnable , Comparable <DispatchTask > {
196209
197210 private final TarFiles tarFiles ;
198211
@@ -206,13 +219,13 @@ private class PrefetchDispatchTask implements Runnable, Comparable<PrefetchDispa
206219
207220 private final long creationTime = System .nanoTime ();
208221
209- PrefetchDispatchTask (@ NotNull TarFiles tarFiles , Supplier <Map <String , Set <UUID >>> indicesSupplier , long msb , long lsb , int depth ) {
210- checkArgument (depth < prefetchDepth , "depth must be < %d, is %d" , prefetchDepth , depth );
222+ private DispatchTask (@ NotNull TarFiles tarFiles , Supplier <Map <String , Set <UUID >>> indicesSupplier , long msb , long lsb , int depth ) {
223+ checkArgument (depth <= preloadDepth , "depth must be <= %d, is %d" , preloadDepth , depth );
211224 this .tarFiles = tarFiles ;
212225 this .indicesSupplier = indicesSupplier ;
213226 this .msb = msb ;
214227 this .lsb = lsb ;
215- this .depth = depth + 1 ;
228+ this .depth = depth ;
216229 LOG .debug ("Created: {}" , this );
217230 }
218231
@@ -239,26 +252,22 @@ public void run() {
239252 long refMsb = reference .getMostSignificantBits ();
240253 long refLsb = reference .getLeastSignificantBits ();
241254 if (!delegate .containsSegment (refMsb , refLsb )) {
242- prefetch ( tarFiles , () -> indices , refMsb , refLsb , depth );
243- } else if (depth < prefetchDepth && SegmentId .isDataSegmentId (refLsb )) {
244- dispatch (tarFiles , () -> indices , refMsb , refLsb , depth );
255+ preload ( refMsb , refLsb , depth );
256+ } else if (depth < preloadDepth && SegmentId .isDataSegmentId (refLsb )) {
257+ dispatch (refMsb , refLsb , depth + 1 );
245258 }
246259 }
247260 }
248261
249- private void prefetch (TarFiles tarFiles , Supplier <Map <String , Set <UUID >>> indicesSupplier , long msb , long lsb , int depth ) {
250- execute (prefetchPool , new PrefetchTask (tarFiles , indicesSupplier , msb , lsb , depth ));
251- }
252-
253262 @ Override
254263 public boolean equals (Object o ) {
255264 if (this == o ) {
256265 return true ;
257266 }
258- if (o .getClass () != PrefetchDispatchTask .class ) {
267+ if (o .getClass () != DispatchTask .class ) {
259268 return false ;
260269 }
261- PrefetchDispatchTask that = (PrefetchDispatchTask ) o ;
270+ DispatchTask that = (DispatchTask ) o ;
262271 return msb == that .msb && lsb == that .lsb && depth == that .depth ;
263272 }
264273
@@ -269,10 +278,10 @@ public int hashCode() {
269278
270279 @ Override
271280 public String toString () {
272- return "PrefetchDispatchTask {segmentId=" + new UUID (msb , lsb ) + ", depth=" + depth + '}' ;
281+ return "DispatchTask {segmentId=" + new UUID (msb , lsb ) + ", depth=" + depth + '}' ;
273282 }
274283
275- private int getPrefetchDepth () {
284+ private int getPreloadDepth () {
276285 return depth ;
277286 }
278287
@@ -281,30 +290,27 @@ private long getCreationTime() {
281290 }
282291
283292 @ Override
284- public int compareTo (@ NotNull SegmentPreloader .PrefetchDispatchTask o ) {
293+ public int compareTo (@ NotNull SegmentPreloader .DispatchTask o ) {
285294 return Comparator
286- .comparing (PrefetchDispatchTask :: getPrefetchDepth )
287- .thenComparing (PrefetchDispatchTask ::getCreationTime )
295+ .comparing (DispatchTask :: getPreloadDepth )
296+ .thenComparing (DispatchTask ::getCreationTime )
288297 .compare (this , o );
289298 }
290299 }
291300
292- private class PrefetchTask implements Runnable {
301+ class PreloadTask implements Runnable {
293302
294303 private final TarFiles tarFiles ;
295304
296- private final Supplier <Map <String , Set <UUID >>> indicesSupplier ;
297-
298305 private final long msb ;
299306
300307 private final long lsb ;
301308
302309 private final int depth ;
303310
304- PrefetchTask (TarFiles tarFiles , Supplier < Map < String , Set < UUID >>> indicesSupplier , long msb , long lsb , int depth ) {
305- checkArgument (depth <= prefetchDepth , "depth must be <= %d, is %d" , prefetchDepth , depth );
311+ private PreloadTask (TarFiles tarFiles , long msb , long lsb , int depth ) {
312+ checkArgument (depth <= preloadDepth , "depth must be <= %d, is %d" , preloadDepth , depth );
306313 this .tarFiles = tarFiles ;
307- this .indicesSupplier = indicesSupplier ;
308314 this .msb = msb ;
309315 this .lsb = lsb ;
310316 this .depth = depth ;
@@ -314,8 +320,8 @@ private class PrefetchTask implements Runnable {
314320 @ Override
315321 public void run () {
316322 LOG .debug ("Running: {}" , this );
317- if (depth < prefetchDepth && SegmentId .isDataSegmentId (lsb )) {
318- dispatch (tarFiles , indicesSupplier , msb , lsb , depth );
323+ if (depth < preloadDepth && SegmentId .isDataSegmentId (lsb )) {
324+ dispatch (msb , lsb , depth + 1 );
319325 }
320326 if (!delegate .containsSegment (msb , lsb )) {
321327 Buffer segmentBuffer = tarFiles .readSegment (msb , lsb );
@@ -330,10 +336,10 @@ public boolean equals(Object o) {
330336 if (this == o ) {
331337 return true ;
332338 }
333- if (o .getClass () != PrefetchTask .class ) {
339+ if (o .getClass () != PreloadTask .class ) {
334340 return false ;
335341 }
336- PrefetchTask that = (PrefetchTask ) o ;
342+ PreloadTask that = (PreloadTask ) o ;
337343 return msb == that .msb && lsb == that .lsb ;
338344 }
339345
@@ -344,7 +350,7 @@ public int hashCode() {
344350
345351 @ Override
346352 public String toString () {
347- return "PrefetchTask {segmentId=" + new UUID (msb , lsb ) + ", depth=" + depth + '}' ;
353+ return "PreloadTask {segmentId=" + new UUID (msb , lsb ) + ", depth=" + depth + '}' ;
348354 }
349355 }
350356}
0 commit comments