Skip to content

Commit 2bb0abb

Browse files
committed
Introduce the process visiable scheduler for IDEA rxjava.
Signed-off-by: Wei Zhang <[email protected]>
1 parent 72d0471 commit 2bb0abb

File tree

5 files changed

+35
-54
lines changed

5 files changed

+35
-54
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteProcess.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -156,23 +156,17 @@ public Optional<Subscription> getJobSubscription() {
156156
public void start() {
157157
// Build, deploy and wait for the job done.
158158
jobSubscription = SparkSubmitHelper.getInstance().buildArtifact(project, submitModel.isLocalArtifact(), submitModel.getArtifact())
159-
.flatMap(artifact -> {
160-
logInfo("Deploy the jar file into cluster...");
161-
162-
return JobUtils.deployArtifact(
163-
submitModel.getArtifactPath(artifact.getName())
164-
.orElseThrow(() -> propagate(new SparkJobException("Can't find jar path to upload"))),
165-
submitModel.getSubmissionParameter().getClusterName(),
166-
ctrlSubject)
167-
.subscribeOn(IdeaSchedulers.processBarVisable());
168-
// .subscribeOn(Schedulers.io());
169-
})
159+
.flatMap(artifact -> JobUtils.deployArtifact(
160+
submitModel.getArtifactPath(artifact.getName())
161+
.orElseThrow(() -> propagate(new SparkJobException("Can't find jar path to upload"))),
162+
submitModel.getSubmissionParameter().getClusterName(),
163+
ctrlSubject)
164+
.subscribeOn(IdeaSchedulers.processBarVisibleAsync(project, "Deploy the jar file into cluster")))
170165
.flatMap(clusterArtifactUriPair -> {
171-
logInfo("The Spark job is submitting ...");
172-
173166
IClusterDetail cluster = clusterArtifactUriPair.getKey();
174167
submitModel.getSubmissionParameter().setFilePath(clusterArtifactUriPair.getValue());
175-
return JobUtils.submit(cluster, submitModel.getSubmissionParameter()).subscribeOn(Schedulers.io());
168+
return JobUtils.submit(cluster, submitModel.getSubmissionParameter())
169+
.subscribeOn(IdeaSchedulers.processBarVisibleAsync(project, "Submit the Spark batch job"));
176170
})
177171
.doOnSuccess(job -> {
178172
getEventSubject().onNext(new SparkBatchJobSubmissionEvent(
@@ -201,15 +195,15 @@ public void start() {
201195
try {
202196
final String state = job.getState();
203197

204-
if (state.equals("starting") || state.equals("not_started")) {
198+
if (state.equals("starting") || state.equals("not_started") || state.equals("running")) {
205199
logInfo("Job is waiting for start due to cluster busy, please wait or disconnect (The job will run when the cluster is free).");
206200

207201
return Observable.timer(5, TimeUnit.SECONDS);
208202
}
209203
} catch (IOException ignored) {
210204
}
211205

212-
return Observable.error(new SparkJobException("Spark Job Service not available, please check HDInsight cluster status."));
206+
return Observable.error(new SparkJobException("Spark Job Service not available, please check HDInsight cluster status.", err));
213207
})))
214208
.flatMap(runningJob -> runningJob.getJobDoneObservable().subscribeOn(Schedulers.io()))
215209
.subscribe(sdPair -> {

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/intellij/rxjava/ApplicationManagerInvokeAndWaitExecutor.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/intellij/rxjava/IdeaSchedulers.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,29 @@
2222

2323
package com.microsoft.intellij.rxjava;
2424

25+
import com.intellij.openapi.application.ApplicationManager;
26+
import com.intellij.openapi.progress.ProgressIndicator;
27+
import com.intellij.openapi.progress.ProgressManager;
28+
import com.intellij.openapi.progress.Task;
29+
import com.intellij.openapi.progress.impl.BackgroundableProcessIndicator;
30+
import com.intellij.openapi.project.Project;
31+
import org.jetbrains.annotations.NotNull;
2532
import rx.Scheduler;
2633
import rx.schedulers.Schedulers;
2734

2835
public class IdeaSchedulers {
29-
public static Scheduler processBarVisable() {
30-
return Schedulers.from(new ApplicationManagerInvokeAndWaitExecutor());
36+
public static Scheduler processBarVisibleAsync(@NotNull Project project, @NotNull String title) {
37+
return Schedulers.from(command -> ApplicationManager.getApplication().invokeLater(() -> {
38+
final Task.Backgroundable task = new Task.Backgroundable(project, title) {
39+
@Override
40+
public void run(@NotNull ProgressIndicator progressIndicator) {
41+
command.run();
42+
}
43+
};
44+
45+
final ProgressIndicator progressIndicator = new BackgroundableProcessIndicator(task);
46+
47+
ProgressManager.getInstance().runProcessWithProgressAsynchronously(task, progressIndicator);
48+
}));
3149
}
3250
}

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/sdk/rest/livy/interactive/api/PostSessions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
package com.microsoft.azure.hdinsight.sdk.rest.livy.interactive.api;
2424

25+
import com.fasterxml.jackson.annotation.JsonInclude;
2526
import com.microsoft.azure.hdinsight.sdk.rest.IConvertible;
2627
import com.microsoft.azure.hdinsight.sdk.rest.livy.interactive.SessionKind;
2728

@@ -45,6 +46,7 @@
4546
* Response Type
4647
* @see com.microsoft.azure.hdinsight.sdk.rest.livy.interactive.Session
4748
*/
49+
@JsonInclude(JsonInclude.Include.NON_NULL)
4850
public class PostSessions implements IConvertible {
4951
private SessionKind kind; // The session kind (required)
5052
private String proxyUser; // User to impersonate when starting the session

Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/jobs/JobUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ public static String uploadFileToHDFS(@NotNull IClusterDetail selectedClusterDet
532532
String sessionName = "Helper session to upload " + destUri.toString();
533533
URI livyUri = getLivyBaseUri(selectedClusterDetail);
534534

535+
logSubject.onNext(new SimpleImmutableEntry<>(Info, "Create Spark helper interactive session..."));
536+
535537
return Observable.using(() -> new SparkSession(sessionName, livyUri, username, password),
536538
SparkSession::create,
537539
SparkSession::close)
@@ -544,7 +546,7 @@ public static String uploadFileToHDFS(@NotNull IClusterDetail selectedClusterDet
544546
try {
545547
inFile = new BufferedInputStream(new FileInputStream(srcJarFile));
546548

547-
logSubject.onNext(new SimpleImmutableEntry<>(Info, String.format("Uploading to %s...", destUri)));
549+
logSubject.onNext(new SimpleImmutableEntry<>(Info, String.format("Uploading %s...", srcJarFile)));
548550
IOUtils.copy(inFile, base64Enc);
549551

550552
inFile.close();

0 commit comments

Comments
 (0)