Skip to content

Commit bb9c550

Browse files
committed
Fix the job done observable complete issue
Signed-off-by: Wei Zhang <[email protected]>
1 parent 5d0ffd8 commit bb9c550

File tree

2 files changed

+8
-11
lines changed

2 files changed

+8
-11
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteProcess.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ public void start() {
187187
sparkJob = job;
188188

189189
ob.onNext(job);
190+
ob.onCompleted();
190191
} catch (IOException e) {
191192
ob.onError(e);
192193
}
@@ -213,10 +214,11 @@ public void start() {
213214
ctrlSubject.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, "Job state is " + sdPair.getKey().toString()));
214215
ctrlSubject.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, "Diagnostics: " + sdPair.getValue()));
215216
}
216-
disconnect();
217217
}, err -> {
218218
ctrlSubject.onError(err);
219219
disconnect();
220+
}, () -> {
221+
disconnect();
220222
});
221223
}
222224

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneOb
517517
String applicationId = null;
518518
String diagnostics = "";
519519

520-
while (true) {
520+
while (isJobActive || !isLogAggregateDone) {
521521
if (isJobActive) {
522522
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
523523
this.getConnectUri().toString(), batchId);
@@ -533,9 +533,7 @@ public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneOb
533533
isJobActive = !state.isJobDone();
534534
applicationId = jobResp.getAppId();
535535
}
536-
}
537-
538-
if (!isLogAggregateDone && applicationId != null) {
536+
} else {
539537
App yarnApp = this.getSparkJobYarnApplication(this.getConnectUri(), applicationId);
540538
diagnostics = yarnApp.getDiagnostics();
541539

@@ -555,13 +553,10 @@ public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneOb
555553
}
556554

557555
// Retry interval
558-
if (!isJobActive && isLogAggregateDone) {
559-
ob.onNext(new SimpleImmutableEntry<>(state, diagnostics));
560-
break;
561-
} else {
562-
sleep(1000);
563-
}
556+
sleep(1000);
564557
}
558+
559+
ob.onNext(new SimpleImmutableEntry<>(state, diagnostics));
565560
} catch (IOException ex) {
566561
ob.onError(ex);
567562
} catch (InterruptedException ignored) {

0 commit comments

Comments
 (0)