2525
2626import java .io .Closeable ;
2727import java .io .IOException ;
28+ import java .time .Instant ;
2829import java .util .UUID ;
30+ import java .util .concurrent .BlockingQueue ;
2931import java .util .concurrent .Callable ;
3032import java .util .concurrent .ExecutionException ;
3133import java .util .concurrent .LinkedBlockingQueue ;
34+ import java .util .concurrent .RejectedExecutionHandler ;
3235import java .util .concurrent .ThreadPoolExecutor ;
3336import java .util .concurrent .TimeUnit ;
3437import java .util .concurrent .atomic .AtomicInteger ;
@@ -308,12 +311,11 @@ private static class PrefetchCache extends NonEmptyCache {
308311 */
309312 private PrefetchCache (int cacheSizeMB , int prefetchThreads , int prefetchDepth , @ NotNull Function <UUID , SegmentId > uuidToSegmentId , @ NotNull Function <SegmentId , Segment > segmentLoader ) {
310313 super (cacheSizeMB );
311- this .prefetchPool = new ThreadPoolExecutor (prefetchThreads , prefetchThreads ,
314+ this .prefetchPool = new ThreadPoolExecutor (Math . max ( 1 , Math . round ( prefetchThreads / 2f )) , prefetchThreads ,
312315 30 , TimeUnit .SECONDS ,
313- new LinkedBlockingQueue <>(prefetchThreads * 8 ),
316+ new LinkedBlockingQueue <>(prefetchThreads * 2 ), // TODO - increase queue size
314317 r -> {
315- String threadName = String .format ("segment-prefetch-%s" ,
316- Long .toHexString (System .nanoTime () & 0xFFFFF ));
318+ String threadName = String .format ("segment-prefetch-%s" , Long .toHexString (System .nanoTime () & 0xFFFFF ));
317319 return new Thread (r , threadName ) {
318320 {
319321 setUncaughtExceptionHandler ((t , e ) -> {
@@ -324,7 +326,26 @@ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @
324326 }
325327 };
326328 },
327- new ThreadPoolExecutor .DiscardPolicy ());
329+ new RejectedExecutionHandler () {
330+ @ Override
331+ public void rejectedExecution (Runnable r , ThreadPoolExecutor executor ) {
332+ if (!executor .isShutdown () && r instanceof PrefetchRunnable && ((PrefetchRunnable ) r ).isExpedite ()) {
333+ PrefetchRunnable prefetchRunnable = (PrefetchRunnable ) r ;
334+ // make space for the redispatch of the expedited prefetch task
335+ Runnable oldest = executor .getQueue ().poll ();
336+ // re-dispatch from a pool thread as un-expedited prefetch tasks,
337+ // takes the dispatching load off the critical path
338+ executor .execute (() -> {
339+ if (oldest != null ) {
340+ executor .execute (oldest );
341+ }
342+ executor .execute (new PrefetchRunnable (prefetchRunnable .segment , prefetchRunnable .depth , false ));
343+ });
344+ } else {
345+ LOG .info ("The prefetch queue is full, dropping prefetch task {}" , r );
346+ }
347+ }
348+ });
328349 this .prefetchPool .allowCoreThreadTimeOut (true );
329350 this .prefetchDepth = prefetchDepth ;
330351 this .segmentLoader = segmentLoader ;
@@ -334,9 +355,13 @@ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @
334355 @ Override
335356 public @ NotNull Segment getSegment (@ NotNull SegmentId id , @ NotNull Callable <Segment > loader ) throws ExecutionException {
336357 return super .getSegment (id , () -> {
358+ Instant start = Instant .now ();
337359 Segment s = loader .call ();
360+ LOG .info ("Segment {} loaded on critical path ({}ms)" , id , Instant .now ().toEpochMilli () - start .toEpochMilli ());
338361 if (s != null && id .isDataSegmentId ()) {
339- prefetchPool .execute (new PrefetchRunnable (s , prefetchDepth ));
362+ start = Instant .now ();
363+ prefetchPool .execute (new PrefetchRunnable (s , prefetchDepth , true ));
364+ LOG .info ("Reference prefetch for segment {} enqueued on critical path ({}ms)" , id , Instant .now ().toEpochMilli () - start .toEpochMilli ());
340365 }
341366 return s ;
342367 });
@@ -357,12 +382,17 @@ public void close() {
357382 }
358383
359384 private class PrefetchRunnable implements Runnable {
385+
360386 private final Segment segment ;
387+
361388 private final int depth ;
362389
363- PrefetchRunnable (Segment segment , int depth ) {
390+ private final boolean expedite ;
391+
392+ PrefetchRunnable (Segment segment , int depth , boolean expedite ) {
364393 this .segment = segment ;
365394 this .depth = depth ;
395+ this .expedite = expedite ;
366396 }
367397
368398 @ Override
@@ -384,7 +414,12 @@ public void run() {
384414 throw e ;
385415 } finally {
386416 if (s != null && depth > 0 ) {
387- prefetchPool .execute (new PrefetchRunnable (s , depth - 1 ));
417+ BlockingQueue <Runnable > queue = prefetchPool .getQueue ();
418+ if (queue .remainingCapacity () < queue .size () / 4 ) {
419+ // throttle the enqueuing of prefetch tasks
420+ TimeUnit .SECONDS .sleep (5 );
421+ }
422+ prefetchPool .execute (new PrefetchRunnable (s , depth - 1 , false ));
388423 }
389424 }
390425 });
@@ -400,6 +435,10 @@ public void run() {
400435 public String toString () {
401436 return "PrefetchRunnable{segment=" + segment .getSegmentId () + '}' ;
402437 }
438+
439+ public boolean isExpedite () {
440+ return expedite ;
441+ }
403442 }
404443 }
405444
0 commit comments