Skip to content

Commit 596878d

Browse files
committed
Fix the regression issue of get Spark job done on Spark 2.2
Signed-off-by: Wei Zhang <[email protected]>
1 parent 71c5969 commit 596878d

File tree

1 file changed

+7
-4
lines changed
  • Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common

1 file changed

+7
-4
lines changed

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -750,11 +750,11 @@ public boolean isActive() throws IOException {
750750
public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneObservable() {
751751
return Observable.create((Subscriber<? super SimpleImmutableEntry<SparkBatchJobState, String>> ob) -> {
752752
try {
753-
boolean isJobActive = true;
753+
boolean isJobActive;
754754
SparkBatchJobState state = SparkBatchJobState.NOT_STARTED;
755755
String diagnostics = "";
756756

757-
while (isJobActive) {
757+
do {
758758
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
759759
this.getConnectUri().toString(), batchId);
760760

@@ -768,17 +768,20 @@ public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneOb
768768
diagnostics = String.join("\n", jobResp.getLog());
769769

770770
isJobActive = !state.isJobDone();
771+
} else {
772+
isJobActive = false;
771773
}
772774

775+
773776
// Retry interval
774777
sleep(1000);
775-
}
778+
} while (isJobActive);
776779

777780
ob.onNext(new SimpleImmutableEntry<>(state, diagnostics));
781+
ob.onCompleted();
778782
} catch (IOException ex) {
779783
ob.onError(ex);
780784
} catch (InterruptedException ignored) {
781-
} finally {
782785
ob.onCompleted();
783786
}
784787
});

0 commit comments

Comments
 (0)