|
26 | 26 | import com.gargoylesoftware.htmlunit.Cache; |
27 | 27 | import com.gargoylesoftware.htmlunit.ScriptException; |
28 | 28 | import com.gargoylesoftware.htmlunit.WebClient; |
| 29 | +import com.gargoylesoftware.htmlunit.html.DomElement; |
| 30 | +import com.gargoylesoftware.htmlunit.html.HtmlAnchor; |
29 | 31 | import com.gargoylesoftware.htmlunit.html.HtmlPage; |
30 | 32 | import com.gargoylesoftware.htmlunit.html.HtmlTableBody; |
31 | 33 | import com.microsoft.azure.hdinsight.common.MessageInfoType; |
|
42 | 44 | import rx.Subscriber; |
43 | 45 |
|
44 | 46 | import java.io.IOException; |
| 47 | +import java.net.MalformedURLException; |
45 | 48 | import java.net.URI; |
46 | 49 | import java.net.UnknownServiceException; |
47 | 50 | import java.util.AbstractMap.SimpleImmutableEntry; |
@@ -452,53 +455,36 @@ public Observable<AppAttempt> getSparkJobYarnCurrentAppAttempt() { |
452 | 455 | * @return The string pair Observable of Host and Container Id |
453 | 456 | */ |
454 | 457 | public Observable<SimpleImmutableEntry<String, String>> getSparkJobYarnContainersObservable(@NotNull AppAttempt appAttempt) { |
455 | | - return Observable.create((Subscriber<? super HtmlPage> ob) -> { |
456 | | - String getYarnUiAppAttemptsURL = getConnectUri() |
457 | | - .resolve("/yarnui/hn/cluster/appattempt/") |
458 | | - .resolve(appAttempt.getAppAttemptId()) |
459 | | - .toString(); |
460 | | - |
461 | | - |
462 | | - final WebClient HTTP_WEB_CLIENT = new WebClient(BrowserVersion.CHROME); |
463 | | - HTTP_WEB_CLIENT.setCache(globalCache); |
464 | | - |
465 | | - if (getSubmission().getCredentialsProvider() != null) { |
466 | | - HTTP_WEB_CLIENT.setCredentialsProvider(getSubmission().getCredentialsProvider()); |
467 | | - } |
468 | | - |
469 | | - try { |
470 | | - ob.onNext(HTTP_WEB_CLIENT.getPage(getYarnUiAppAttemptsURL)); |
471 | | - } catch (IOException e) { |
472 | | - log().warn("get Spark job Yarn attempts detail IO Error", e); |
473 | | - ob.onError(e); |
474 | | - } catch (ScriptException ignored) { |
475 | | - log().debug("get Spark job Yarn attempts detail browser rendering Error", ignored); |
476 | | - } finally { |
477 | | - ob.onCompleted(); |
478 | | - } |
479 | | - }) |
| 458 | + return Observable.just(appAttempt) |
| 459 | + .map(attempt -> getConnectUri() |
| 460 | + .resolve("/yarnui/hn/cluster/appattempt/") |
| 461 | + .resolve(attempt.getAppAttemptId()) |
| 462 | + .toString()) |
| 463 | + .flatMap(this::loadPageByBrowserObservable) |
480 | 464 | .retry(getRetriesMax()) |
481 | | - .delay(3, TimeUnit.SECONDS) // Workaround to waiting for the page loading finished |
482 | 465 | .repeatWhen(ob -> ob.delay(getDelaySeconds(), TimeUnit.SECONDS)) |
483 | 466 | .takeUntil(this::isSparkJobYarnAppAttemptNotJustLaunched) |
484 | 467 | .filter(this::isSparkJobYarnAppAttemptNotJustLaunched) |
485 | 468 | .flatMap(htmlPage -> { |
486 | | - // Get the container table by XPath |
487 | | - HtmlTableBody containerBody = htmlPage.getFirstByXPath("//*[@id=\"containers\"]/tbody"); |
488 | | - |
489 | | - return Observable |
490 | | - .from(containerBody |
491 | | - .getRows() |
492 | | - .stream() |
| 469 | + // Get the container table by XPath |
| 470 | + HtmlTableBody containerBody = htmlPage.getFirstByXPath("//*[@id=\"containers\"]/tbody"); |
| 471 | + |
| 472 | + return Observable |
| 473 | + .from(containerBody.getRows()) |
| 474 | + .flatMap(row -> Observable.just(getConnectUri()) |
| 475 | + .map(baseUri -> baseUri.resolve(((HtmlAnchor) row.getCell(3).getFirstChild()) |
| 476 | + .getHrefAttribute()) |
| 477 | + .toString()) |
| 478 | + .flatMap(this::loadPageByBrowserObservable) |
| 479 | + .filter(this::isSparkJobYarnContainerLogAvailable) |
| 480 | + .map(page -> row)) |
493 | 481 | .map(row -> { |
494 | 482 | String hostUrl = row.getCell(1).getTextContent().trim(); |
495 | 483 | String host = URI.create(hostUrl).getHost(); |
496 | 484 | String containerId = row.getCell(0).getTextContent().trim(); |
497 | 485 |
|
498 | 486 | return new SimpleImmutableEntry<>(host, containerId); |
499 | | - }) |
500 | | - .collect(Collectors.toList()) |
501 | | - ); |
| 487 | + }); |
502 | 488 | }); |
503 | 489 | } |
504 | 490 |
|
@@ -526,6 +512,38 @@ private Boolean isSparkJobYarnAppAttemptNotJustLaunched(@NotNull HtmlPage htmlPa |
526 | 512 | .orElse(false); |
527 | 513 | } |
528 | 514 |
|
| 515 | + private Boolean isSparkJobYarnContainerLogAvailable(@NotNull HtmlPage htmlPage) { |
| 516 | + Optional<DomElement> firstContent = Optional.ofNullable( |
| 517 | + htmlPage.getFirstByXPath("//*[@id=\"layout\"]/tbody/tr/td[2]")); |
| 518 | + |
| 519 | + return firstContent.map(DomElement::getTextContent) |
| 520 | + .map(line -> !line.trim() |
| 521 | + .toLowerCase() |
| 522 | + .contains("no logs available")) |
| 523 | + .orElse(false); |
| 524 | + } |
| 525 | + |
| 526 | + private Observable<HtmlPage> loadPageByBrowserObservable(String url) { |
| 527 | + final WebClient HTTP_WEB_CLIENT = new WebClient(BrowserVersion.CHROME); |
| 528 | + HTTP_WEB_CLIENT.setCache(globalCache); |
| 529 | + |
| 530 | + if (getSubmission().getCredentialsProvider() != null) { |
| 531 | + HTTP_WEB_CLIENT.setCredentialsProvider(getSubmission().getCredentialsProvider()); |
| 532 | + } |
| 533 | + |
| 534 | + return Observable.create(ob -> { |
| 535 | + try { |
| 536 | + ob.onNext(HTTP_WEB_CLIENT.getPage(url)); |
| 537 | + } catch (ScriptException ignored) { |
| 538 | + log().debug("get Spark job Yarn attempts detail browser rendering Error", ignored); |
| 539 | + } catch (IOException e) { |
| 540 | + ob.onError(e); |
| 541 | + } finally { |
| 542 | + ob.onCompleted(); |
| 543 | + } |
| 544 | + }); |
| 545 | + } |
| 546 | + |
529 | 547 | /** |
530 | 548 | * Get Spark Job driver log URL with retries |
531 | 549 | * |
|
0 commit comments