Skip to content

Commit e836618

Browse files
committed
Fix the Spark 2.1 cluster timing issue for remote debugging.
Signed-off-by: Wei Zhang <[email protected]>
1 parent 8cc0b53 commit e836618

File tree

2 files changed

+21
-13
lines changed

2 files changed

+21
-13
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobDebuggerRunner.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.net.URI;
6363
import java.net.URISyntaxException;
6464
import java.net.UnknownServiceException;
65+
import java.util.AbstractMap;
6566
import java.util.AbstractMap.SimpleEntry;
6667
import java.util.HashMap;
6768
import java.util.Optional;
@@ -138,7 +139,7 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
138139
debugProcessPhaser = new Phaser(1);
139140

140141
Observable.create((Observable.OnSubscribe<String>) ob ->
141-
createDebugJobSession(submitModel).subscribe(debugJobClusterPair -> {
142+
createDebugJobSession(submitModel, ob).subscribe(debugJobClusterPair -> {
142143
final SparkBatchRemoteDebugJob remoteDebugJob = debugJobClusterPair.getKey();
143144
final IClusterDetail clusterDetail = debugJobClusterPair.getValue();
144145
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
@@ -176,16 +177,9 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
176177

177178
ob.onNext("Info: Spark Job Driver debugging started.");
178179

179-
Subscription livyLogSubscription = submitModel
180-
.jobLogObservable(remoteDebugJob.getBatchId(), clusterDetail)
181-
.subscribeOn(Schedulers.io())
182-
.subscribe();
183-
184180
// Await for all debug processes finish
185181
debugProcessPhaser.arriveAndAwaitAdvance();
186182
ob.onCompleted();
187-
188-
livyLogSubscription.unsubscribe();
189183
}, ob::onError))
190184
.subscribe(
191185
info -> HDInsightUtil.showInfoOnSubmissionMessageWindow(project, info),
@@ -252,7 +246,7 @@ private ExecutionEnvironment buildChildEnvironment(@NotNull ExecutionEnvironment
252246
* Create a Debug Spark Job session with building, deploying and submitting
253247
*/
254248
private Single<SimpleEntry<SparkBatchRemoteDebugJob, IClusterDetail>> createDebugJobSession(
255-
@NotNull SparkSubmitModel submitModel) {
249+
@NotNull SparkSubmitModel submitModel, Subscriber<? super String> debugSessionSub) {
256250
SparkSubmissionParameter submissionParameter = submitModel.getSubmissionParameter();
257251
String selectedClusterName = submissionParameter.getClusterName();
258252

@@ -285,11 +279,23 @@ private Single<SimpleEntry<SparkBatchRemoteDebugJob, IClusterDetail>> createDebu
285279
submitModel.tryToCreateBatchSparkDebugJob(selectedClusterDetail);
286280
setDebugJob(remoteDebugJob);
287281

282+
return new SimpleEntry<>(remoteDebugJob, selectedClusterDetail);
283+
} catch (Exception e) {
284+
HDInsightUtil.setJobRunningStatus(submitModel.getProject(), false);
285+
throw propagate(e);
286+
}})
287+
.flatMap(jobClusterPair -> jobClusterPair.getKey().getSubmissionLog()
288+
.map(AbstractMap.SimpleImmutableEntry::getValue)
289+
.doOnNext(debugSessionSub::onNext)
290+
.doOnError(debugSessionSub::onError)
291+
.last()
292+
.toSingle()
293+
.map(message -> jobClusterPair))
294+
.doOnSuccess(jobClusterPair -> {
295+
try {
288296
SparkBatchDebugSession session = createSparkBatchDebugSession(
289-
selectedClusterDetail.getConnectionUrl(), submitModel.getAdvancedConfigModel()).open();
297+
jobClusterPair.getValue().getConnectionUrl(), submitModel.getAdvancedConfigModel()).open();
290298
setDebugSession(session);
291-
292-
return new SimpleEntry<>(remoteDebugJob, selectedClusterDetail);
293299
} catch (Exception e) {
294300
HDInsightUtil.setJobRunningStatus(submitModel.getProject(), false);
295301
throw propagate(e);

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ public Observable<SimpleImmutableEntry<MessageInfoType, String>> getSubmissionLo
638638
boolean isSubmitting = true;
639639

640640
while (isSubmitting) {
641+
Boolean isAppIdAllocated = !this.getSparkJobApplicationIdObservable().isEmpty().toBlocking().last();
641642
String logUrl = String.format("%s/%d/log?from=%d&size=%d",
642643
this.getConnectUri().toString(), batchId, start, maxLinesPerGet);
643644

@@ -656,8 +657,9 @@ public Observable<SimpleImmutableEntry<MessageInfoType, String>> getSubmissionLo
656657

657658
// Retry interval
658659
if (linesGot == 0) {
660+
isSubmitting = this.getState().equals("starting") && !isAppIdAllocated;
661+
659662
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
660-
isSubmitting = this.getState().equals("starting");
661663
}
662664
}
663665
} catch (IOException ex) {

0 commit comments

Comments
 (0)