Skip to content

Commit 2578717

Browse files
committed
Fix the Spark remote debugging issue on Spark 2.1 cluster
Signed-off-by: Wei Zhang <[email protected]>
1 parent 1fb8c7c commit 2578717

File tree

8 files changed

+363
-41
lines changed

8 files changed

+363
-41
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobDebuggerRunner.java

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public class SparkBatchJobDebuggerRunner extends GenericDebuggerRunner {
8585

8686
// More complex pattern, please use grok
8787
private final Pattern simpleLogPattern = Pattern.compile("\\d{1,2}[/-]\\d{1,2}[/-]\\d{1,2} \\d{1,2}:\\d{1,2}:\\d{1,2} (INFO|WARN|ERROR) .*", Pattern.DOTALL);
88-
private final Pattern executorLogUrlPattern = Pattern.compile("^\\s+SPARK_LOG_URL_STDERR -> https?://([^:]+):?\\d*/node/containerlogs/(container.*)/livy/stderr.*");
8988

9089
public void setDebugJob(SparkBatchRemoteDebugJob debugJob) {
9190
this.debugJob = debugJob;
@@ -158,6 +157,7 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
158157

159158
// Create Driver debug process
160159
createDebugProcess(
160+
remoteDebugJob,
161161
environment,
162162
callback,
163163
submissionState,
@@ -383,7 +383,8 @@ public SparkBatchDebugSession createSparkBatchDebugSession(
383383
/*
384384
* Create a debug process, if it's a Driver process, the following Executor processes will be created
385385
*/
386-
private void createDebugProcess(@NotNull ExecutionEnvironment environment,
386+
private void createDebugProcess(SparkBatchRemoteDebugJob remoteDebugJob,
387+
@NotNull ExecutionEnvironment environment,
387388
@Nullable Callback callback,
388389
@NotNull SparkBatchJobSubmissionState submissionState,
389390
boolean isDriver,
@@ -392,7 +393,7 @@ private void createDebugProcess(@NotNull ExecutionEnvironment environment,
392393
String remoteHost,
393394
int remotePort,
394395
String logUrl,
395-
final CredentialsProvider credentialsProvider ) {
396+
final CredentialsProvider credentialsProvider) {
396397
SparkBatchDebugSession session = getDebugSession().orElse(null);
397398
if (session == null) {
398399
return;
@@ -412,7 +413,7 @@ private void createDebugProcess(@NotNull ExecutionEnvironment environment,
412413
if (isDriver) {
413414
debugProcessOb = debugProcessOb.share();
414415

415-
matchExecutorFromDebugProcessObservable(debugProcessOb)
416+
getExecutorsObservable(remoteDebugJob)
416417
.subscribe(hostContainerPair -> {
417418
String host = hostContainerPair.getKey();
418419
String containerId = hostContainerPair.getValue();
@@ -448,7 +449,7 @@ private void createDebugProcess(@NotNull ExecutionEnvironment environment,
448449
}
449450

450451
// create debug process for the Spark job executor
451-
createDebugProcess( environment,
452+
createDebugProcess(remoteDebugJob, environment,
452453
callback,
453454
newExecutorState,
454455
false,
@@ -515,6 +516,18 @@ public void processTerminated(ProcessEvent processEvent) {
515516
.subscribe(debugProcessConsole::onNext, debugSessionSubscriber::onError, debugPhaser::arriveAndDeregister);
516517
}
517518

519+
/**
520+
* To get Executor from Yarn UI App Attempt page
521+
*/
522+
private Observable<SimpleEntry<String, String>> getExecutorsObservable(@NotNull SparkBatchRemoteDebugJob sparkDebugJob) {
523+
return sparkDebugJob
524+
.getSparkJobYarnCurrentAppAttempt()
525+
.flatMap(appAttempt -> sparkDebugJob.getSparkJobYarnContainersObservable(appAttempt)
526+
.filter(hostContainerPair -> !StringUtils.equals(
527+
hostContainerPair.getValue(), appAttempt.getContainerId())))
528+
.map(kv -> new SimpleEntry<>(kv.getKey(), kv.getValue()));
529+
}
530+
518531
/*
519532
* Create an Observable for a debug process, the Yarn log 'stderr' will be considered as the events
520533
* with its type key.
@@ -547,39 +560,6 @@ private Observable<SimpleEntry<String, Key>> createDebugProcessObservable(
547560
.filter(lineKeyPair -> lineKeyPair.getKey() != null);
548561
}
549562

550-
/**
551-
* To match Executor lunch content from debug process Observable
552-
*
553-
* @param debugProcessOb the debug process Observable to match
554-
* @return matched Executor Observable, the event is SimpleEntry with host, containerId pair
555-
*/
556-
private Observable<SimpleEntry<String, String>> matchExecutorFromDebugProcessObservable(
557-
Observable<SimpleEntry<String, Key>> debugProcessOb) {
558-
PublishSubject<String> closeSubject = PublishSubject.create();
559-
PublishSubject<String> openSubject = PublishSubject.create();
560-
561-
return debugProcessOb
562-
.map(lineKeyPair -> {
563-
String line = lineKeyPair.getKey();
564-
565-
if (line.matches("^YARN executor launch context:$")) {
566-
openSubject.onNext("YARN executor launch");
567-
}
568-
569-
if (line.matches("^={5,}$")) {
570-
closeSubject.onNext("=====");
571-
}
572-
573-
return line;
574-
})
575-
.window(openSubject, s -> closeSubject)
576-
.flatMap(executorLunchContextOb -> executorLunchContextOb
577-
.map(executorLogUrlPattern::matcher)
578-
.filter(Matcher::matches)
579-
.map(matcher -> new SimpleEntry<>(matcher.group(1), matcher.group(2)))
580-
);
581-
}
582-
583563
protected int getLogReadBlockSize() {
584564
return 4096;
585565
}

Utils/hdinsight-node-common/Test/java/com/microsoft/azure/hdinsight/spark/common/SparkBatchRemoteDebugJobScenario.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
package com.microsoft.azure.hdinsight.spark.common;
2424

25-
import com.microsoft.azure.hdinsight.sdk.common.HttpResponse;
25+
import com.microsoft.azure.hdinsight.sdk.rest.yarn.rm.AppAttempt;
2626
import cucumber.api.java.Before;
27+
import cucumber.api.java.en.And;
2728
import cucumber.api.java.en.Given;
2829
import cucumber.api.java.en.Then;
2930
import org.mockito.ArgumentCaptor;
3031
import org.slf4j.Logger;
32+
import rx.Observable;
3133

3234
import java.net.URI;
3335
import java.util.HashMap;
@@ -228,4 +230,21 @@ public void checkGetSparkDriverHost(
228230
assertEquals(expectedHost, "__exception_got__");
229231
}
230232
}
233+
234+
@And("^mock method getSparkJobApplicationIdObservable to return '(.+)' Observable$")
235+
public void mockMethodGetSparkJobApplicationIdObservable(String appIdMock) {
236+
when(debugJobMock.getSparkJobApplicationIdObservable()).thenReturn(Observable.just(appIdMock));
237+
}
238+
239+
@Then("^getting current Yarn App attempt should be '(.+)'$")
240+
public void checkGetCurrentYarnAppAttemptResult(String appAttemptIdExpect) {
241+
when(debugJobMock.getConnectUri()).thenReturn(URI.create(httpServerMock.completeUrl("/")));
242+
243+
AppAttempt appAttempt = debugJobMock
244+
.getSparkJobYarnCurrentAppAttempt()
245+
.toBlocking()
246+
.first();
247+
248+
assertEquals(appAttemptIdExpect, appAttempt.getAppAttemptId());
249+
}
231250
}

Utils/hdinsight-node-common/Test/resources/com/microsoft/azure/hdinsight/spark/common/SparkBatchRemoteDebugJobScenario.feature

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,9 @@ Feature: Spark Batch Remote Debug Job Testing
6565
Given setup a mock livy service for GET request '/batch/9' to return '{"id":9,"state":"starting","appId":"application_1492415936046_0015","appInfo":{"driverLogUrl":"http://127.0.0.1:$port/yarnui/10.0.0.15/node/containerlogs/container_e02_1492415936046_0015_01_000001/livy","sparkUiUrl":"https://spkdbg.azurehdinsight.net/yarnui/hn/proxy/application_1492415936046_0015/"},"log":["\\t ApplicationMaster RPC port: -1","\\t queue: default","\\t start time: 1492569369011","\\t final status: UNDEFINED","\\t tracking URL: https://spkdbg.azurehdinsight.net/yarnui/hn/proxy/application_1492415936046_0015/","\\t user: livy","17/04/19 02:36:09 INFO ShutdownHookManager: Shutdown hook called","17/04/19 02:36:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-1984dc9d-acd4-4648-9104-398431590f8e","YARN Diagnostics:","AM container is launched, waiting for AM container to Register with RM"]}' with status code 200
6666
And setup a mock livy service for GET request '/yarnui/ws/v1/cluster/apps/application_1492415936046_0015' to return '{ "app": { "amNodeLabelExpression": "", "finishedTime": 1493100184345, "startedTime": 1493097873053, "priority": 0, "applicationTags": "livy-batch-15-bcixsxv0", "applicationType": "SPARK", "clusterId": 1492780173422, "diagnostics": "Application application_1492780173422_0003 failed 5 times due to ApplicationMaster for attempt appattempt_1492780173422_0003_000005 timed out. Failing the application.", "trackingUrl": "http://hn0-zhwe-s.uhunwunss5gupibv1jib3beicb.lx.internal.cloudapp.net:8088/cluster/app/application_1492780173422_0003", "id": "application_1492780173422_0003", "user": "livy", "name": "SparkCore_WasbIOTest", "queue": "default", "state": "FAILED", "finalStatus": "FAILED", "progress": 100, "trackingUI": "History", "elapsedTime": 2311292, "amContainerLogs": "http://10.0.0.15:30060/node/containerlogs/container_e03_1492780173422_0003_05_000001/livy", "amHostHttpAddress": "10.0.0.15:30060", "allocatedMB": -1, "allocatedVCores": -1, "runningContainers": -1, "memorySeconds": 3549035, "vcoreSeconds": 2308, "queueUsagePercentage": 0, "clusterUsagePercentage": 0, "preemptedResourceMB": 0, "preemptedResourceVCores": 0, "numNonAMContainerPreempted": 0, "numAMContainerPreempted": 0, "logAggregationStatus": "SUCCEEDED", "unmanagedApplication": false }}' with status code 200
6767
Then getting Spark driver host from URL '/batch', batch ID 9 should be '__exception_got__'
68-
Then throw exception 'java.net.UnknownServiceException' with message 'The Livy job 9 on yarn is not running.'
68+
Then throw exception 'java.net.UnknownServiceException' with message 'The Livy job 9 on yarn is not running.'
69+
70+
Scenario: getSparkJobYarnCurrentAppAttempt integration test with response
71+
Given setup a mock livy service for GET request '/yarnui/ws/v1/cluster/apps/application_1513565654634_0011/appattempts' to return '{"appAttempts":{"appAttempt":[{"id":1,"startTime":1513673984219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000001","nodeHttpAddress":"10.0.0.6:30060","nodeId":"10.0.0.6:30050","logsLink":"http://10.0.0.6:30060/node/containerlogs/container_1513565654634_0011_01_000001/livy","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000001"},{"id":2,"startTime":1513673985219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000002","nodeHttpAddress":"10.0.0.7:30060","nodeId":"10.0.0.7:30050","logsLink":"http://10.0.0.7:30060/node/containerlogs/container_1513565654634_0011_01_000002/livy","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000002"}]}}' with status code 200
72+
And mock method getSparkJobApplicationIdObservable to return 'application_1513565654634_0011' Observable
73+
Then getting current Yarn App attempt should be 'appattempt_1513565654634_0011_000002'
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation
3+
* <p/>
4+
* All rights reserved.
5+
* <p/>
6+
* MIT License
7+
* <p/>
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
10+
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
11+
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
* <p/>
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
14+
* the Software.
15+
* <p/>
16+
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
17+
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19+
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
* SOFTWARE.
21+
*/
22+
23+
package com.microsoft.azure.hdinsight.sdk.rest.yarn.rm;
24+
25+
public class AppAttempt {
26+
private int id; // The app attempt id
27+
private String nodeId; // The node id of the node the attempt ran on
28+
private String nodeHttpAddress; // The node http address of the node the attempt ran on
29+
private String logsLink; // The http link to the app attempt logs
30+
private String containerId; // The id of the container for the app attempt
31+
private long startTime; // The start time of the attempt (in ms since epoch)
32+
private long finishedTime; // The end time of the attempt (in ms since epoch), 0 for not end
33+
private String blacklistedNodes; // Nodes blacklist
34+
private String appAttemptId; // App Attempt Id
35+
36+
public int getId() {
37+
return id;
38+
}
39+
40+
public void setId(int id) {
41+
this.id = id;
42+
}
43+
44+
public String getNodeId() {
45+
return nodeId;
46+
}
47+
48+
public void setNodeId(String nodeId) {
49+
this.nodeId = nodeId;
50+
}
51+
52+
public String getNodeHttpAddress() {
53+
return nodeHttpAddress;
54+
}
55+
56+
public void setNodeHttpAddress(String nodeHttpAddress) {
57+
this.nodeHttpAddress = nodeHttpAddress;
58+
}
59+
60+
public String getLogsLink() {
61+
return logsLink;
62+
}
63+
64+
public void setLogsLink(String logsLink) {
65+
this.logsLink = logsLink;
66+
}
67+
68+
public String getContainerId() {
69+
return containerId;
70+
}
71+
72+
public void setContainerId(String containerId) {
73+
this.containerId = containerId;
74+
}
75+
76+
public long getStartTime() {
77+
return startTime;
78+
}
79+
80+
public void setStartTime(long startTime) {
81+
this.startTime = startTime;
82+
}
83+
84+
public long getFinishedTime() {
85+
return finishedTime;
86+
}
87+
88+
public void setFinishedTime(long finishedTime) {
89+
this.finishedTime = finishedTime;
90+
}
91+
92+
public String getBlacklistedNodes() {
93+
return blacklistedNodes;
94+
}
95+
96+
public void setBlacklistedNodes(String blacklistedNodes) {
97+
this.blacklistedNodes = blacklistedNodes;
98+
}
99+
100+
public String getAppAttemptId() {
101+
return appAttemptId;
102+
}
103+
104+
public void setAppAttemptId(String appAttemptId) {
105+
this.appAttemptId = appAttemptId;
106+
}
107+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation
3+
* <p/>
4+
* All rights reserved.
5+
* <p/>
6+
* MIT License
7+
* <p/>
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
10+
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
11+
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
* <p/>
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
14+
* the Software.
15+
* <p/>
16+
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
17+
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19+
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
* SOFTWARE.
21+
*/
22+
23+
package com.microsoft.azure.hdinsight.sdk.rest.yarn.rm;
24+
25+
import java.util.List;
26+
27+
public class AppAttemptsResponse {
28+
public class AppAttempts {
29+
public List<AppAttempt> appAttempt;
30+
}
31+
32+
private AppAttempts appAttempts;
33+
34+
public AppAttempts getAppAttempts() {
35+
return appAttempts;
36+
}
37+
38+
public void setAppAttempts(AppAttempts appAttempts) {
39+
this.appAttempts = appAttempts;
40+
}
41+
}

0 commit comments

Comments
 (0)