32
32
import java .util .Map ;
33
33
import java .util .Optional ;
34
34
import java .util .Set ;
35
- import java .util .concurrent .CompletableFuture ;
36
35
import java .util .concurrent .ConcurrentHashMap ;
36
+ import java .util .concurrent .ExecutorService ;
37
+ import java .util .concurrent .Executors ;
37
38
import java .util .concurrent .ScheduledExecutorService ;
38
39
import java .util .concurrent .TimeUnit ;
39
40
import java .util .concurrent .locks .Lock ;
@@ -85,12 +86,15 @@ public class LocalNodeRegistry implements NodeRegistry {
85
86
private final Map <NodeId , Runnable > allChecks = new ConcurrentHashMap <>();
86
87
private final ReadWriteLock lock = new ReentrantReadWriteLock (/* fair */ true );
87
88
private final ScheduledExecutorService nodeHealthCheckService ;
89
+ private final ExecutorService nodeHealthCheckExecutor ;
88
90
private final Duration purgeNodesInterval ;
89
91
private final ScheduledExecutorService purgeDeadNodesService ;
92
+ private final int newSessionThreadPoolSize ;
90
93
91
94
public LocalNodeRegistry (
92
95
Tracer tracer ,
93
96
EventBus bus ,
97
+ int newSessionThreadPoolSize ,
94
98
HttpClient .Factory clientFactory ,
95
99
Secret registrationSecret ,
96
100
Duration healthcheckInterval ,
@@ -106,6 +110,7 @@ public LocalNodeRegistry(
106
110
Require .nonNull ("Node health check service" , nodeHealthCheckService );
107
111
this .purgeNodesInterval = Require .nonNull ("Purge nodes interval" , purgeNodesInterval );
108
112
this .purgeDeadNodesService = Require .nonNull ("Purge dead nodes service" , purgeDeadNodesService );
113
+ this .newSessionThreadPoolSize = newSessionThreadPoolSize ;
109
114
110
115
this .model = new LocalGridModel (bus );
111
116
this .nodes = new ConcurrentHashMap <>();
@@ -134,6 +139,16 @@ public LocalNodeRegistry(
134
139
healthcheckInterval .toMillis (),
135
140
TimeUnit .MILLISECONDS );
136
141
142
+ this .nodeHealthCheckExecutor =
143
+ Executors .newFixedThreadPool (
144
+ this .newSessionThreadPoolSize ,
145
+ r -> {
146
+ Thread t = new Thread (r );
147
+ t .setName ("node-health-check-" + t .getId ());
148
+ t .setDaemon (true );
149
+ return t ;
150
+ });
151
+
137
152
// Schedule node purging if interval is non-zero
138
153
if (!this .purgeNodesInterval .isZero ()) {
139
154
this .purgeDeadNodesService .scheduleAtFixedRate (
@@ -309,10 +324,9 @@ public void runHealthChecks() {
309
324
310
325
// Large deployments: process in parallel batches with controlled concurrency
311
326
int batchSize = Math .max (10 , total / 10 );
312
- int maxConcurrentBatches = Math .min (5 , Runtime .getRuntime ().availableProcessors ());
313
327
314
328
List <List <Runnable >> batches = partition (checks , batchSize );
315
- processBatchesInParallel (batches , maxConcurrentBatches );
329
+ processBatchesInParallel (batches );
316
330
}
317
331
318
332
@ Override
@@ -400,35 +414,28 @@ public boolean isReady() {
400
414
}
401
415
}
402
416
403
- private void processBatchesInParallel (List <List <Runnable >> batches , int maxConcurrentBatches ) {
417
+ private void processBatchesInParallel (List <List <Runnable >> batches ) {
404
418
if (batches .isEmpty ()) {
405
419
return ;
406
420
}
407
- List <CompletableFuture <Void >> inFlight = new ArrayList <>();
408
- for (List <Runnable > batch : batches ) {
409
- CompletableFuture <Void > fut =
410
- CompletableFuture .runAsync (
411
- () ->
412
- batch .parallelStream ()
413
- .forEach (
414
- r -> {
415
- try {
416
- r .run ();
417
- } catch (Throwable t ) {
418
- LOG .log (
419
- getDebugLogLevel (), "Health check execution failed in batch" , t );
420
- }
421
- }),
422
- nodeHealthCheckService );
423
- inFlight .add (fut );
424
- if (inFlight .size () >= maxConcurrentBatches ) {
425
- CompletableFuture .allOf (inFlight .toArray (new CompletableFuture [0 ])).join ();
426
- inFlight .clear ();
427
- }
428
- }
429
- if (!inFlight .isEmpty ()) {
430
- CompletableFuture .allOf (inFlight .toArray (new CompletableFuture [0 ])).join ();
431
- }
421
+
422
+ // Process all batches with controlled parallelism
423
+ batches .forEach (
424
+ batch ->
425
+ nodeHealthCheckExecutor .submit (
426
+ () ->
427
+ batch .parallelStream ()
428
+ .forEach (
429
+ r -> {
430
+ try {
431
+ r .run ();
432
+ } catch (Throwable t ) {
433
+ LOG .log (
434
+ getDebugLogLevel (),
435
+ "Health check execution failed in batch" ,
436
+ t );
437
+ }
438
+ })));
432
439
}
433
440
434
441
private static List <List <Runnable >> partition (List <Runnable > list , int size ) {
0 commit comments