@@ -344,20 +344,12 @@ public void testFetchFullCacheEntry() throws Exception {
344344 .build ();
345345
346346 AtomicInteger bulkTaskCount = new AtomicInteger (0 );
347- ThreadPool threadPool = new TestThreadPool ("test" ) {
347+ final var threadPool = new TestThreadPool ("test" );
348+ final var bulkExecutor = new StoppableExecutorServiceWrapper (threadPool .generic ()) {
348349 @ Override
349- public ExecutorService executor (String name ) {
350- ExecutorService generic = super .executor (Names .GENERIC );
351- if (Objects .equals (name , "bulk" )) {
352- return new StoppableExecutorServiceWrapper (generic ) {
353- @ Override
354- public void execute (Runnable command ) {
355- super .execute (command );
356- bulkTaskCount .incrementAndGet ();
357- }
358- };
359- }
360- return generic ;
350+ public void execute (Runnable command ) {
351+ super .execute (command );
352+ bulkTaskCount .incrementAndGet ();
361353 }
362354 };
363355
@@ -368,7 +360,6 @@ public void execute(Runnable command) {
368360 settings ,
369361 threadPool ,
370362 ThreadPool .Names .GENERIC ,
371- "bulk" ,
372363 BlobCacheMetrics .NOOP
373364 )
374365 ) {
@@ -381,7 +372,7 @@ public void execute(Runnable command) {
381372 cacheService .maybeFetchFullEntry (cacheKey , size , (channel , channelPos , relativePos , length , progressUpdater ) -> {
382373 bytesRead .addAndGet (-length );
383374 progressUpdater .accept (length );
384- }, future );
375+ }, bulkExecutor , future );
385376
386377 future .get (10 , TimeUnit .SECONDS );
387378 assertEquals (0L , bytesRead .get ());
@@ -394,7 +385,7 @@ public void execute(Runnable command) {
394385 assertEquals (2 , cacheService .freeRegionCount ());
395386 var configured = cacheService .maybeFetchFullEntry (cacheKey , size (500 ), (ch , chPos , relPos , len , update ) -> {
396387 throw new AssertionError ("Should never reach here" );
397- }, ActionListener .noop ());
388+ }, bulkExecutor , ActionListener .noop ());
398389 assertFalse (configured );
399390 assertEquals (2 , cacheService .freeRegionCount ());
400391 }
@@ -411,16 +402,8 @@ public void testFetchFullCacheEntryConcurrently() throws Exception {
411402 .put ("path.home" , createTempDir ())
412403 .build ();
413404
414- ThreadPool threadPool = new TestThreadPool ("test" ) {
415- @ Override
416- public ExecutorService executor (String name ) {
417- ExecutorService generic = super .executor (Names .GENERIC );
418- if (Objects .equals (name , "bulk" )) {
419- return new StoppableExecutorServiceWrapper (generic );
420- }
421- return generic ;
422- }
423- };
405+ final var threadPool = new TestThreadPool ("test" );
406+ final var bulkExecutor = new StoppableExecutorServiceWrapper (threadPool .generic ());
424407
425408 try (
426409 NodeEnvironment environment = new NodeEnvironment (settings , TestEnvironment .newEnvironment (settings ));
@@ -429,7 +412,6 @@ public ExecutorService executor(String name) {
429412 settings ,
430413 threadPool ,
431414 ThreadPool .Names .GENERIC ,
432- "bulk" ,
433415 BlobCacheMetrics .NOOP
434416 )
435417 ) {
@@ -446,6 +428,7 @@ public ExecutorService executor(String name) {
446428 cacheKey ,
447429 size ,
448430 (channel , channelPos , relativePos , length , progressUpdater ) -> progressUpdater .accept (length ),
431+ bulkExecutor ,
449432 f
450433 )
451434 );
@@ -677,7 +660,6 @@ public void testMaybeEvictRecentUsed() throws Exception {
677660 settings ,
678661 taskQueue .getThreadPool (),
679662 ThreadPool .Names .GENERIC ,
680- "bulk" ,
681663 BlobCacheMetrics .NOOP
682664 )
683665 ) {
@@ -716,21 +698,13 @@ public void testMaybeFetchRegion() throws Exception {
716698 .put ("path.home" , createTempDir ())
717699 .build ();
718700
719- AtomicInteger bulkTaskCount = new AtomicInteger (0 );
720- ThreadPool threadPool = new TestThreadPool ("test" ) {
701+ final var bulkTaskCount = new AtomicInteger (0 );
702+ final var threadPool = new TestThreadPool ("test" );
703+ final var bulkExecutor = new StoppableExecutorServiceWrapper (threadPool .generic ()) {
721704 @ Override
722- public ExecutorService executor (String name ) {
723- ExecutorService generic = super .executor (Names .GENERIC );
724- if (Objects .equals (name , "bulk" )) {
725- return new StoppableExecutorServiceWrapper (generic ) {
726- @ Override
727- public void execute (Runnable command ) {
728- super .execute (command );
729- bulkTaskCount .incrementAndGet ();
730- }
731- };
732- }
733- return generic ;
705+ public void execute (Runnable command ) {
706+ super .execute (command );
707+ bulkTaskCount .incrementAndGet ();
734708 }
735709 };
736710 try (
@@ -740,7 +714,6 @@ public void execute(Runnable command) {
740714 settings ,
741715 threadPool ,
742716 ThreadPool .Names .GENERIC ,
743- "bulk" ,
744717 BlobCacheMetrics .NOOP
745718 )
746719 ) {
@@ -754,7 +727,7 @@ public void execute(Runnable command) {
754727 cacheService .maybeFetchRegion (cacheKey , 0 , blobLength , (channel , channelPos , relativePos , length , progressUpdater ) -> {
755728 bytesRead .addAndGet (length );
756729 progressUpdater .accept (length );
757- }, future );
730+ }, bulkExecutor , future );
758731
759732 var fetched = future .get (10 , TimeUnit .SECONDS );
760733 assertThat ("Region has been fetched" , fetched , is (true ));
@@ -782,6 +755,7 @@ public void execute(Runnable command) {
782755 bytesRead .addAndGet (length );
783756 progressUpdater .accept (length );
784757 },
758+ bulkExecutor ,
785759 listener
786760 );
787761 }
@@ -802,7 +776,7 @@ public void execute(Runnable command) {
802776 cacheService .maybeFetchRegion (cacheKey , 0 , blobLength , (channel , channelPos , relativePos , length , progressUpdater ) -> {
803777 bytesRead .addAndGet (length );
804778 progressUpdater .accept (length );
805- }, future );
779+ }, bulkExecutor , future );
806780
807781 var fetched = future .get (10 , TimeUnit .SECONDS );
808782 assertThat ("Region has been fetched" , fetched , is (true ));
@@ -831,7 +805,6 @@ public void testPopulate() throws Exception {
831805 settings ,
832806 taskQueue .getThreadPool (),
833807 ThreadPool .Names .GENERIC ,
834- ThreadPool .Names .GENERIC ,
835808 BlobCacheMetrics .NOOP
836809 )
837810 ) {
@@ -924,7 +897,6 @@ public void testUseFullRegionSize() throws IOException {
924897 settings ,
925898 taskQueue .getThreadPool (),
926899 ThreadPool .Names .GENERIC ,
927- ThreadPool .Names .GENERIC ,
928900 BlobCacheMetrics .NOOP
929901 ) {
930902 @ Override
0 commit comments