@@ -311,9 +311,9 @@ private static class PrefetchCache extends NonEmptyCache {
311311 */
312312 private PrefetchCache (int cacheSizeMB , int prefetchThreads , int prefetchDepth , @ NotNull Function <UUID , SegmentId > uuidToSegmentId , @ NotNull Function <SegmentId , Segment > segmentLoader ) {
313313 super (cacheSizeMB );
314- this .prefetchPool = new ThreadPoolExecutor (Math . max ( 1 , Math . round ( prefetchThreads / 2f )) , prefetchThreads ,
315- 30 , TimeUnit .SECONDS ,
316- new LinkedBlockingQueue <>(prefetchThreads * 2 ), // TODO - increase queue size
314+ this .prefetchPool = new ThreadPoolExecutor (prefetchThreads , prefetchThreads ,
315+ 10 , TimeUnit .SECONDS ,
316+ new LinkedBlockingQueue <>(prefetchThreads * 16 ),
317317 r -> {
318318 String threadName = String .format ("segment-prefetch-%s" , Long .toHexString (System .nanoTime () & 0xFFFFF ));
319319 return new Thread (r , threadName ) {
@@ -332,17 +332,10 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
332332 if (!executor .isShutdown () && r instanceof PrefetchRunnable && ((PrefetchRunnable ) r ).isExpedite ()) {
333333 PrefetchRunnable prefetchRunnable = (PrefetchRunnable ) r ;
334334 // make space for the redispatch of the expedited prefetch task
335- Runnable oldest = executor .getQueue ().poll ();
335+ executor .getQueue ().poll ();
336336 // re-dispatch from a pool thread as un-expedited prefetch tasks,
337337 // takes the dispatching load off the critical path
338- executor .execute (() -> {
339- if (oldest != null ) {
340- executor .execute (oldest );
341- }
342- executor .execute (new PrefetchRunnable (prefetchRunnable .segment , prefetchRunnable .depth , false ));
343- });
344- } else {
345- LOG .info ("The prefetch queue is full, dropping prefetch task {}" , r );
338+ executor .execute (new PrefetchRunnable (prefetchRunnable .segment , prefetchRunnable .depth , false ));
346339 }
347340 }
348341 });
@@ -355,13 +348,16 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
355348 @ Override
356349 public @ NotNull Segment getSegment (@ NotNull SegmentId id , @ NotNull Callable <Segment > loader ) throws ExecutionException {
357350 return super .getSegment (id , () -> {
358- Instant start = Instant .now ();
359351 Segment s = loader .call ();
360- LOG .info ("Segment {} loaded on critical path ({}ms)" , id , Instant .now ().toEpochMilli () - start .toEpochMilli ());
361352 if (s != null && id .isDataSegmentId ()) {
362- start = Instant . now ();
353+ long start = System . nanoTime ();
363354 prefetchPool .execute (new PrefetchRunnable (s , prefetchDepth , true ));
364- LOG .info ("Reference prefetch for segment {} enqueued on critical path ({}ms)" , id , Instant .now ().toEpochMilli () - start .toEpochMilli ());
355+ long micros = TimeUnit .NANOSECONDS .toMicros (System .nanoTime () - start );
356+ if (micros > 10_000 ) { // log only if it took more than 10ms
357+ LOG .info ("Reference prefetch for segment {} enqueued in {}µs" , id , micros );
358+ } else if (LOG .isDebugEnabled ()) {
359+ LOG .debug ("Reference prefetch for segment {} enqueued in {}µs" , id , micros );
360+ }
365361 }
366362 return s ;
367363 });
@@ -414,11 +410,6 @@ public void run() {
414410 throw e ;
415411 } finally {
416412 if (s != null && depth > 0 ) {
417- BlockingQueue <Runnable > queue = prefetchPool .getQueue ();
418- if (queue .remainingCapacity () < queue .size () / 4 ) {
419- // throttle the enqueuing of prefetch tasks
420- TimeUnit .SECONDS .sleep (5 );
421- }
422413 prefetchPool .execute (new PrefetchRunnable (s , depth - 1 , false ));
423414 }
424415 }
0 commit comments