|
32 | 32 | import java.util.Map;
|
33 | 33 | import java.util.Optional;
|
34 | 34 | import java.util.Set;
|
| 35 | +import java.util.concurrent.CompletableFuture; |
35 | 36 | import java.util.concurrent.ConcurrentHashMap;
|
36 | 37 | import java.util.concurrent.ScheduledExecutorService;
|
37 |
| -import java.util.concurrent.ScheduledFuture; |
38 | 38 | import java.util.concurrent.TimeUnit;
|
39 | 39 | import java.util.concurrent.locks.Lock;
|
40 | 40 | import java.util.concurrent.locks.ReadWriteLock;
|
@@ -83,7 +83,6 @@ public class LocalNodeRegistry implements NodeRegistry {
|
83 | 83 | private final GridModel model;
|
84 | 84 | private final Map<NodeId, Node> nodes;
|
85 | 85 | private final Map<NodeId, Runnable> allChecks = new ConcurrentHashMap<>();
|
86 |
| - private final Map<NodeId, ScheduledFuture<?>> scheduledHealthChecks = new ConcurrentHashMap<>(); |
87 | 86 | private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
|
88 | 87 | private final ScheduledExecutorService nodeHealthCheckService;
|
89 | 88 | private final Duration purgeNodesInterval;
|
@@ -205,14 +204,7 @@ public void add(Node node) {
|
205 | 204 | try {
|
206 | 205 | nodes.put(node.getId(), node);
|
207 | 206 | model.add(initialNodeStatus);
|
208 |
| - ScheduledFuture<?> future = |
209 |
| - nodeHealthCheckService.scheduleAtFixedRate( |
210 |
| - GuardedRunnable.guard(healthCheck), |
211 |
| - healthcheckInterval.toMillis(), |
212 |
| - healthcheckInterval.toMillis(), |
213 |
| - TimeUnit.MILLISECONDS); |
214 | 207 | allChecks.put(node.getId(), healthCheck);
|
215 |
| - scheduledHealthChecks.put(node.getId(), future); |
216 | 208 | } finally {
|
217 | 209 | writeLock.unlock();
|
218 | 210 | }
|
@@ -243,16 +235,7 @@ public void remove(NodeId nodeId) {
|
243 | 235 | Node node = nodes.remove(nodeId);
|
244 | 236 | model.remove(nodeId);
|
245 | 237 |
|
246 |
| - // Get the health check runnable and remove it from executor service to prevent leaks |
247 |
| - Runnable healthCheck = allChecks.remove(nodeId); |
248 |
| - if (healthCheck != null) { |
249 |
| - ScheduledFuture<?> future = scheduledHealthChecks.remove(nodeId); |
250 |
| - if (future != null) { |
251 |
| - future.cancel(false); // false means don't interrupt if running |
252 |
| - LOG.log( |
253 |
| - getDebugLogLevel(), String.format("Health check task for node %s cancelled", nodeId)); |
254 |
| - } |
255 |
| - } |
| 238 | + allChecks.remove(nodeId); |
256 | 239 |
|
257 | 240 | if (node instanceof RemoteNode) {
|
258 | 241 | try {
|
@@ -317,9 +300,19 @@ public void runHealthChecks() {
|
317 | 300 | readLock.unlock();
|
318 | 301 | }
|
319 | 302 |
|
320 |
| - for (Runnable nodeHealthCheck : nodeHealthChecks.values()) { |
321 |
| - nodeHealthCheck.run(); |
| 303 | + if (nodeHealthChecks.isEmpty()) { |
| 304 | + return; |
322 | 305 | }
|
| 306 | + |
| 307 | + List<Runnable> checks = new ArrayList<>(nodeHealthChecks.values()); |
| 308 | + int total = checks.size(); |
| 309 | + |
| 310 | + // Large deployments: process in parallel batches with controlled concurrency |
| 311 | + int batchSize = Math.max(10, total / 10); |
| 312 | + int maxConcurrentBatches = Math.min(5, Runtime.getRuntime().availableProcessors()); |
| 313 | + |
| 314 | + List<List<Runnable>> batches = partition(checks, batchSize); |
| 315 | + processBatchesInParallel(batches, maxConcurrentBatches); |
323 | 316 | }
|
324 | 317 |
|
325 | 318 | @Override
|
@@ -407,6 +400,49 @@ public boolean isReady() {
|
407 | 400 | }
|
408 | 401 | }
|
409 | 402 |
|
| 403 | + private void processBatchesInParallel(List<List<Runnable>> batches, int maxConcurrentBatches) { |
| 404 | + if (batches.isEmpty()) { |
| 405 | + return; |
| 406 | + } |
| 407 | + List<CompletableFuture<Void>> inFlight = new ArrayList<>(); |
| 408 | + for (List<Runnable> batch : batches) { |
| 409 | + CompletableFuture<Void> fut = |
| 410 | + CompletableFuture.runAsync( |
| 411 | + () -> |
| 412 | + batch.parallelStream() |
| 413 | + .forEach( |
| 414 | + r -> { |
| 415 | + try { |
| 416 | + r.run(); |
| 417 | + } catch (Throwable t) { |
| 418 | + LOG.log( |
| 419 | + getDebugLogLevel(), "Health check execution failed in batch", t); |
| 420 | + } |
| 421 | + }), |
| 422 | + nodeHealthCheckService); |
| 423 | + inFlight.add(fut); |
| 424 | + if (inFlight.size() >= maxConcurrentBatches) { |
| 425 | + CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
| 426 | + inFlight.clear(); |
| 427 | + } |
| 428 | + } |
| 429 | + if (!inFlight.isEmpty()) { |
| 430 | + CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
| 431 | + } |
| 432 | + } |
| 433 | + |
| 434 | + private static List<List<Runnable>> partition(List<Runnable> list, int size) { |
| 435 | + List<List<Runnable>> batches = new ArrayList<>(); |
| 436 | + if (list.isEmpty() || size <= 0) { |
| 437 | + return batches; |
| 438 | + } |
| 439 | + for (int i = 0; i < list.size(); i += size) { |
| 440 | + int end = Math.min(i + size, list.size()); |
| 441 | + batches.add(new ArrayList<>(list.subList(i, end))); |
| 442 | + } |
| 443 | + return batches; |
| 444 | + } |
| 445 | + |
410 | 446 | private Runnable asRunnableHealthCheck(Node node) {
|
411 | 447 | HealthCheck healthCheck = node.getHealthCheck();
|
412 | 448 | NodeId id = node.getId();
|
@@ -535,5 +571,25 @@ public Node getNode(URI uri) {
|
535 | 571 | @Override
|
536 | 572 | public void close() {
|
537 | 573 | LOG.info("Shutting down LocalNodeRegistry");
|
| 574 | + Lock writeLock = lock.writeLock(); |
| 575 | + writeLock.lock(); |
| 576 | + try { |
| 577 | + allChecks.clear(); |
| 578 | + nodes |
| 579 | + .values() |
| 580 | + .forEach( |
| 581 | + n -> { |
| 582 | + if (n instanceof RemoteNode) { |
| 583 | + try { |
| 584 | + ((RemoteNode) n).close(); |
| 585 | + } catch (Exception e) { |
| 586 | + LOG.log(Level.WARNING, "Unable to close node properly: " + e.getMessage()); |
| 587 | + } |
| 588 | + } |
| 589 | + }); |
| 590 | + nodes.clear(); |
| 591 | + } finally { |
| 592 | + writeLock.unlock(); |
| 593 | + } |
538 | 594 | }
|
539 | 595 | }
|
0 commit comments