|
32 | 32 | import java.util.Map;
|
33 | 33 | import java.util.Optional;
|
34 | 34 | import java.util.Set;
|
35 |
| -import java.util.concurrent.CompletableFuture; |
36 | 35 | import java.util.concurrent.ConcurrentHashMap;
|
37 | 36 | import java.util.concurrent.ScheduledExecutorService;
|
38 | 37 | import java.util.concurrent.TimeUnit;
|
@@ -404,31 +403,22 @@ private void processBatchesInParallel(List<List<Runnable>> batches, int maxConcu
|
404 | 403 | if (batches.isEmpty()) {
|
405 | 404 | return;
|
406 | 405 | }
|
407 |
| - List<CompletableFuture<Void>> inFlight = new ArrayList<>(); |
408 |
| - for (List<Runnable> batch : batches) { |
409 |
| - CompletableFuture<Void> fut = |
410 |
| - CompletableFuture.runAsync( |
411 |
| - () -> |
412 |
| - batch.parallelStream() |
413 |
| - .forEach( |
414 |
| - r -> { |
415 |
| - try { |
416 |
| - r.run(); |
417 |
| - } catch (Throwable t) { |
418 |
| - LOG.log( |
419 |
| - getDebugLogLevel(), "Health check execution failed in batch", t); |
420 |
| - } |
421 |
| - }), |
422 |
| - nodeHealthCheckService); |
423 |
| - inFlight.add(fut); |
424 |
| - if (inFlight.size() >= maxConcurrentBatches) { |
425 |
| - CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
426 |
| - inFlight.clear(); |
427 |
| - } |
428 |
| - } |
429 |
| - if (!inFlight.isEmpty()) { |
430 |
| - CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
431 |
| - } |
| 406 | + |
| 407 | + // Process batches with controlled parallelism |
| 408 | + batches.parallelStream() |
| 409 | + .limit(maxConcurrentBatches) |
| 410 | + .forEach( |
| 411 | + batch -> |
| 412 | + batch.parallelStream() |
| 413 | + .forEach( |
| 414 | + r -> { |
| 415 | + try { |
| 416 | + r.run(); |
| 417 | + } catch (Throwable t) { |
| 418 | + LOG.log( |
| 419 | + getDebugLogLevel(), "Health check execution failed in batch", t); |
| 420 | + } |
| 421 | + })); |
432 | 422 | }
|
433 | 423 |
|
434 | 424 | private static List<List<Runnable>> partition(List<Runnable> list, int size) {
|
|
0 commit comments