Skip to content

Commit 5d0ffd8

Browse files
committed
Fix the timeout issue after submitting the job
And also print of the error message Signed-off-by: Wei Zhang <[email protected]>
1 parent 5675622 commit 5d0ffd8

File tree

3 files changed

+110
-61
lines changed

3 files changed

+110
-61
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteProcess.java

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@
2828
import com.intellij.remote.RemoteProcess;
2929
import com.microsoft.azure.hdinsight.common.MessageInfoType;
3030
import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail;
31-
import com.microsoft.azure.hdinsight.spark.common.SparkBatchJob;
32-
import com.microsoft.azure.hdinsight.spark.common.SparkJobException;
33-
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitHelper;
34-
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitModel;
31+
import com.microsoft.azure.hdinsight.spark.common.*;
3532
import com.microsoft.azure.hdinsight.spark.jobs.JobUtils;
3633
import org.apache.commons.io.output.NullOutputStream;
3734
import org.jetbrains.annotations.NotNull;
3835
import org.jetbrains.annotations.Nullable;
39-
import rx.Subscription;
36+
import rx.*;
4037
import rx.schedulers.Schedulers;
4138
import rx.subjects.PublishSubject;
4239

@@ -45,6 +42,7 @@
4542
import java.io.OutputStream;
4643
import java.util.AbstractMap.SimpleImmutableEntry;
4744
import java.util.Optional;
45+
import java.util.concurrent.TimeUnit;
4846

4947
import static com.microsoft.azure.hdinsight.common.MessageInfoType.Info;
5048
import static rx.exceptions.Exceptions.propagate;
@@ -172,25 +170,51 @@ public void start() {
172170
submitModel.getSubmissionParameter().setFilePath(clusterArtifactUriPair.getValue());
173171
return JobUtils.submit(cluster, submitModel.getSubmissionParameter()).subscribeOn(Schedulers.io());
174172
})
175-
.map(job -> {
176-
try {
177-
sparkJob = job;
178-
179-
jobLogSubscription = job.getSubmissionLog()
180-
.subscribeOn(Schedulers.io())
181-
.subscribe(ctrlSubject::onNext, ctrlSubject::onError);
182-
183-
jobStderrLogInputSteam.attachJob(job);
184-
jobStdoutLogInputSteam.attachJob(job);
185-
} catch (IOException e) {
186-
throw propagate(e);
187-
}
173+
.doOnEach(notification -> {
174+
SparkBatchJob job = notification.getValue();
188175

189-
return job;
176+
jobLogSubscription = job.getSubmissionLog()
177+
.subscribeOn(Schedulers.io())
178+
.subscribe(ctrlSubject::onNext, ctrlSubject::onError);
190179
})
191180
.toObservable()
192-
.flatMap(SparkBatchJob::getJobDoneObservable)
193-
.subscribe(state -> disconnect(), err -> {
181+
.flatMap(job -> Observable
182+
.create((Subscriber<? super SparkBatchJob> ob) -> {
183+
try {
184+
jobStderrLogInputSteam.attachJob(job);
185+
jobStdoutLogInputSteam.attachJob(job);
186+
187+
sparkJob = job;
188+
189+
ob.onNext(job);
190+
} catch (IOException e) {
191+
ob.onError(e);
192+
}
193+
})
194+
.retryWhen(attempts -> attempts.flatMap(err -> {
195+
try {
196+
final String state = job.getState();
197+
198+
if (state.equals("starting") || state.equals("not_started")) {
199+
logInfo("Job is waiting for start due to cluster busy, please wait or disconnect (The job will run when the cluster is free).");
200+
201+
return Observable.timer(5, TimeUnit.SECONDS);
202+
}
203+
} catch (IOException ignored) {
204+
}
205+
206+
return Observable.error(new SparkJobException("Spark Job Service not available, please check HDInsight cluster status."));
207+
})))
208+
.flatMap(runningJob -> runningJob.getJobDoneObservable().subscribeOn(Schedulers.io()))
209+
.subscribe(sdPair -> {
210+
if (sdPair.getKey() == SparkBatchJobState.SUCCESS) {
211+
logInfo("Job run successfully.");
212+
} else {
213+
ctrlSubject.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, "Job state is " + sdPair.getKey().toString()));
214+
ctrlSubject.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, "Diagnostics: " + sdPair.getValue()));
215+
}
216+
disconnect();
217+
}, err -> {
194218
ctrlSubject.onError(err);
195219
disconnect();
196220
});

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobSubmissionState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public ExecutionResult execute(Executor executor, @NotNull ProgramRunner program
152152
}});
153153

154154
ctrlMessageView.print("ERROR: " + err.getMessage(), ConsoleViewContentType.ERROR_OUTPUT);
155+
disconnectAction.setEnabled(false);
155156
},
156157
() -> disconnectAction.setEnabled(false)
157158
);

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.microsoft.azure.hdinsight.sdk.rest.yarn.rm.App;
3030
import com.microsoft.azure.hdinsight.sdk.rest.yarn.rm.AppResponse;
3131
import rx.Observable;
32+
import rx.Subscriber;
3233

3334
import java.io.IOException;
3435
import java.net.URI;
@@ -42,7 +43,6 @@
4243
import static com.microsoft.azure.hdinsight.common.MessageInfoType.Error;
4344
import static com.microsoft.azure.hdinsight.common.MessageInfoType.Log;
4445
import static java.lang.Thread.sleep;
45-
import static rx.exceptions.Exceptions.propagate;
4646

4747
public class SparkBatchJob implements ISparkBatchJob, ILogger {
4848
/**
@@ -270,7 +270,7 @@ public String getState() throws IOException {
270270
}
271271
} while (++retries < this.getRetriesMax());
272272

273-
throw new UnknownServiceException("Unknown service error after " + --retries + " retries");
273+
throw new UnknownServiceException("Failed to get job state: Unknown service error after " + --retries + " retries");
274274
}
275275

276276
/**
@@ -312,7 +312,7 @@ protected String getSparkJobApplicationId(URI batchBaseUri, int batchId) throws
312312
}
313313
} while (++retries < this.getRetriesMax());
314314

315-
throw new UnknownServiceException("Unknown service error after " + --retries + " retries");
315+
throw new UnknownServiceException("Failed to get job Application ID: Unknown service error after " + --retries + " retries");
316316
}
317317

318318
/**
@@ -356,7 +356,7 @@ protected App getSparkJobYarnApplication(URI batchBaseUri, String applicationID)
356356
}
357357
} while (++retries < this.getRetriesMax());
358358

359-
throw new UnknownServiceException("Unknown service error after " + --retries + " retries");
359+
throw new UnknownServiceException("Failed to get job Yarn application: Unknown service error after " + --retries + " retries");
360360
}
361361

362362
/**
@@ -399,7 +399,7 @@ public String getSparkJobDriverLogUrl(URI batchBaseUri, int batchId) throws IOEx
399399
}
400400
} while (++retries < this.getRetriesMax());
401401

402-
throw new UnknownServiceException("Unknown service error after " + --retries + " retries");
402+
throw new UnknownServiceException("Failed to get job driver log URL: Unknown service error after " + --retries + " retries");
403403
}
404404

405405
/**
@@ -505,45 +505,69 @@ public boolean isActive() throws IOException {
505505
}
506506
} while (++retries < this.getRetriesMax());
507507

508-
throw new UnknownServiceException("Unknown service error after " + --retries + " retries");
508+
throw new UnknownServiceException("Failed to detect job activity: Unknown service error after " + --retries + " retries");
509509
}
510510

511-
public boolean isLogAggregated() throws IOException {
512-
String applicationId = this.getSparkJobApplicationId(this.getConnectUri(), this.getBatchId());
513-
App yarnApp = this.getSparkJobYarnApplication(this.getConnectUri(), applicationId);
514-
515-
switch (yarnApp.getLogAggregationStatus().toUpperCase()) {
516-
case "SUCCEEDED":
517-
return true;
518-
case "DISABLED":
519-
case "NOT_START":
520-
case "RUNNING":
521-
case "RUNNING_WITH_FAILURE":
522-
case "FAILED":
523-
case "TIME_OUT":
524-
default:
525-
return false;
526-
}
527-
}
511+
public Observable<SimpleImmutableEntry<SparkBatchJobState, String>> getJobDoneObservable() {
512+
return Observable.create((Subscriber<? super SimpleImmutableEntry<SparkBatchJobState, String>> ob) -> {
513+
try {
514+
boolean isJobActive = true;
515+
boolean isLogAggregateDone = false;
516+
SparkBatchJobState state = SparkBatchJobState.NOT_STARTED;
517+
String applicationId = null;
518+
String diagnostics = "";
519+
520+
while (true) {
521+
if (isJobActive) {
522+
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
523+
this.getConnectUri().toString(), batchId);
524+
525+
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
526+
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
527+
httpResponse.getMessage(), SparkSubmitResponse.class)
528+
.orElseThrow(() -> new UnknownServiceException(
529+
"Bad spark job response: " + httpResponse.getMessage()));
530+
531+
state = SparkBatchJobState.valueOf(jobResp.getState().toUpperCase());
532+
533+
isJobActive = !state.isJobDone();
534+
applicationId = jobResp.getAppId();
535+
}
536+
}
528537

529-
public Observable<SparkBatchJobState> getJobDoneObservable() {
530-
return Observable.interval(200, TimeUnit.MILLISECONDS)
531-
.map((times) -> {
532-
try {
533-
return getState();
534-
} catch (IOException e) {
535-
throw propagate(e);
538+
if (!isLogAggregateDone && applicationId != null) {
539+
App yarnApp = this.getSparkJobYarnApplication(this.getConnectUri(), applicationId);
540+
diagnostics = yarnApp.getDiagnostics();
541+
542+
switch (yarnApp.getLogAggregationStatus().toUpperCase()) {
543+
case "SUCCEEDED":
544+
case "FAILED":
545+
isLogAggregateDone = true;
546+
break;
547+
case "DISABLED":
548+
case "NOT_START":
549+
case "RUNNING":
550+
case "RUNNING_WITH_FAILURE":
551+
case "TIME_OUT":
552+
default:
553+
isLogAggregateDone = false;
554+
}
536555
}
537-
})
538-
.map(s -> SparkBatchJobState.valueOf(s.toUpperCase()))
539-
.filter(SparkBatchJobState::isJobDone)
540-
.filter((state) -> {
541-
try {
542-
return isLogAggregated();
543-
} catch (IOException e) {
544-
throw propagate(e);
556+
557+
// Retry interval
558+
if (!isJobActive && isLogAggregateDone) {
559+
ob.onNext(new SimpleImmutableEntry<>(state, diagnostics));
560+
break;
561+
} else {
562+
sleep(1000);
545563
}
546-
})
547-
.delay(3, TimeUnit.SECONDS);
564+
}
565+
} catch (IOException ex) {
566+
ob.onError(ex);
567+
} catch (InterruptedException ignored) {
568+
} finally {
569+
ob.onCompleted();
570+
}
571+
});
548572
}
549573
}

0 commit comments

Comments
 (0)