|
76 | 76 | import java.util.Optional; |
77 | 77 | import java.util.Set; |
78 | 78 | import java.util.concurrent.ThreadLocalRandom; |
| 79 | +import java.util.concurrent.TimeoutException; |
79 | 80 | import java.util.concurrent.atomic.AtomicReference; |
80 | 81 | import java.util.concurrent.locks.Lock; |
81 | 82 | import java.util.concurrent.locks.ReadWriteLock; |
@@ -362,14 +363,15 @@ private DistributedQueryRunner( |
362 | 363 | } |
363 | 364 | prestoClients = prestoClientsBuilder.build(); |
364 | 365 |
|
365 | | - long start = nanoTime(); |
366 | | - while (!allNodesGloballyVisible()) { |
367 | | - Assertions.assertLessThan(nanosSince(start), new Duration(100, SECONDS)); |
368 | | - MILLISECONDS.sleep(10); |
| 366 | + try { |
| 367 | + waitForAllNodesGloballyVisible(); |
| 368 | + } |
| 369 | + catch (TimeoutException e) { |
| 370 | + closer.close(); |
| 371 | + throw e; |
369 | 372 | } |
370 | | - log.info("Announced servers in %s", nanosSince(start).convertToMostSuccinctTimeUnit()); |
371 | 373 |
|
372 | | - start = nanoTime(); |
| 374 | + long start = nanoTime(); |
373 | 375 | for (TestingPrestoServer server : servers) { |
374 | 376 | server.getMetadata().registerBuiltInFunctions(AbstractTestQueries.CUSTOM_FUNCTIONS); |
375 | 377 | } |
@@ -517,22 +519,49 @@ else if (coordinatorSidecar) { |
517 | 519 | return server; |
518 | 520 | } |
519 | 521 |
|
520 | | - private boolean allNodesGloballyVisible() |
| 522 | + private void waitForAllNodesGloballyVisible() |
| 523 | + throws Exception |
521 | 524 | { |
522 | | - int expectedActiveNodesForRm = externalWorkers.size() + servers.size(); |
523 | | - int expectedActiveNodesForCoordinator = externalWorkers.size() + servers.size(); |
| 525 | + long startTimeInMs = nanoTime(); |
| 526 | + int expectedActiveNodes = externalWorkers.size() + servers.size(); |
| 527 | + Duration timeout = new Duration(100, SECONDS); |
524 | 528 |
|
525 | | - for (TestingPrestoServer server : servers) { |
| 529 | + for (int serverIndex = 0; serverIndex < servers.size(); ) { |
| 530 | + TestingPrestoServer server = servers.get(serverIndex); |
526 | 531 | AllNodes allNodes = server.refreshNodes(); |
527 | 532 | int activeNodeCount = allNodes.getActiveNodes().size(); |
528 | 533 |
|
529 | | - if (!allNodes.getInactiveNodes().isEmpty() || |
530 | | - (server.isCoordinator() && activeNodeCount != expectedActiveNodesForCoordinator) || |
531 | | - (server.isResourceManager() && activeNodeCount != expectedActiveNodesForRm)) { |
532 | | - return false; |
| 534 | + if (!allNodes.getInactiveNodes().isEmpty()) { |
| 535 | + throwTimeoutIfNotReady( |
| 536 | + startTimeInMs, |
| 537 | + timeout, |
| 538 | + format("Timed out waiting for all nodes to be globally visible. Inactive nodes: %s", allNodes.getInactiveNodes())); |
| 539 | + MILLISECONDS.sleep(10); |
| 540 | + serverIndex = 0; |
| 541 | + } |
| 542 | + else if ((server.isCoordinator() || server.isResourceManager()) && activeNodeCount != expectedActiveNodes) { |
| 543 | + throwTimeoutIfNotReady( |
| 544 | + startTimeInMs, |
| 545 | + timeout, |
| 546 | + format("Timed out waiting for all nodes to be globally visible. Node count: %s, expected: %s", activeNodeCount, expectedActiveNodes)); |
| 547 | + MILLISECONDS.sleep(10); |
| 548 | + serverIndex = 0; |
| 549 | + } |
| 550 | + else { |
| 551 | + log.info("Server %s has %s active nodes", server.getBaseUrl(), activeNodeCount); |
| 552 | + serverIndex++; |
533 | 553 | } |
534 | 554 | } |
535 | | - return true; |
| 555 | + |
| 556 | + log.info("Announced servers in %s", nanosSince(startTimeInMs).convertToMostSuccinctTimeUnit()); |
| 557 | + } |
| 558 | + |
| 559 | + private static void throwTimeoutIfNotReady(long startTimeInMs, Duration timeout, String message) |
| 560 | + throws TimeoutException |
| 561 | + { |
| 562 | + if (nanosSince(startTimeInMs).compareTo(timeout) >= 0) { |
| 563 | + throw new TimeoutException(message); |
| 564 | + } |
536 | 565 | } |
537 | 566 |
|
538 | 567 | public TestingPrestoClient getRandomClient() |
|
0 commit comments