4040import com .microsoft .azure .hdinsight .common .ClusterManagerEx ;
4141import com .microsoft .azure .hdinsight .common .HDInsightUtil ;
4242import com .microsoft .azure .hdinsight .common .JobStatusManager ;
43+ import com .microsoft .azure .hdinsight .common .mvc .IdeSchedulers ;
4344import com .microsoft .azure .hdinsight .sdk .cluster .IClusterDetail ;
4445import com .microsoft .azure .hdinsight .sdk .common .HDIException ;
4546import com .microsoft .azure .hdinsight .spark .common .*;
4647import com .microsoft .azure .hdinsight .spark .jobs .JobUtils ;
4748import com .microsoft .azure .hdinsight .spark .run .configuration .RemoteDebugRunConfiguration ;
49+ import com .microsoft .intellij .rxjava .IdeaSchedulers ;
4850import org .apache .commons .lang .StringUtils ;
4951import org .apache .http .auth .AuthScope ;
5052import org .apache .http .auth .UsernamePasswordCredentials ;
@@ -125,6 +127,7 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
125127 SparkBatchJobSubmissionState submissionState = (SparkBatchJobSubmissionState ) state ;
126128 SparkSubmitModel submitModel = submissionState .getSubmitModel ();
127129 Project project = submitModel .getProject ();
130+ IdeSchedulers schedulers = new IdeaSchedulers (project );
128131
129132 submissionState .checkSubmissionParameter ();
130133
@@ -138,8 +141,9 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
138141 // Reset the debug process Phaser
139142 debugProcessPhaser = new Phaser (1 );
140143
141- Observable .create ((Observable .OnSubscribe <String >) ob ->
142- createDebugJobSession (submitModel , ob ).subscribe (debugJobClusterPair -> {
144+ Observable .create ((Observable .OnSubscribe <String >) ob -> createDebugJobSession (submitModel , ob , schedulers )
145+ .observeOn (schedulers .processBarVisibleAsync ("Spark batch job is debugging" ))
146+ .subscribe (debugJobClusterPair -> {
143147 final SparkBatchRemoteDebugJob remoteDebugJob = debugJobClusterPair .getKey ();
144148 final IClusterDetail clusterDetail = debugJobClusterPair .getValue ();
145149 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider ();
@@ -164,6 +168,7 @@ protected void execute(@NotNull ExecutionEnvironment environment, @Nullable Call
164168 submissionState ,
165169 true ,
166170 ob ,
171+ schedulers ,
167172 debugProcessPhaser ,
168173 driverHost ,
169174 driverDebugPort ,
@@ -246,7 +251,7 @@ private ExecutionEnvironment buildChildEnvironment(@NotNull ExecutionEnvironment
246251 * Create a Debug Spark Job session with building, deploying and submitting
247252 */
248253 private Single <SimpleEntry <SparkBatchRemoteDebugJob , IClusterDetail >> createDebugJobSession (
249- @ NotNull SparkSubmitModel submitModel , Subscriber <? super String > debugSessionSub ) {
254+ @ NotNull SparkSubmitModel submitModel , Subscriber <? super String > debugSessionSub , IdeSchedulers schedulers ) {
250255 SparkSubmissionParameter submissionParameter = submitModel .getSubmissionParameter ();
251256 String selectedClusterName = submissionParameter .getClusterName ();
252257
@@ -271,7 +276,8 @@ private Single<SimpleEntry<SparkBatchRemoteDebugJob, IClusterDetail>> createDebu
271276 } catch (Exception e ) {
272277 ob .onError (e );
273278 }
274- }).subscribeOn (Schedulers .io ()))
279+ }).subscribeOn (schedulers .processBarVisibleAsync ("Deploy the jar file into cluster" )))
280+ .observeOn (schedulers .processBarVisibleAsync ("Submit the Spark batch job" ))
275281 .map ((selectedClusterDetail ) -> {
276282 // Create Batch Spark Debug Job
277283 try {
@@ -395,6 +401,7 @@ private void createDebugProcess(SparkBatchRemoteDebugJob remoteDebugJob,
395401 @ NotNull SparkBatchJobSubmissionState submissionState ,
396402 boolean isDriver ,
397403 @ NotNull Subscriber <? super String > debugSessionSubscriber ,
404+ IdeSchedulers schedulers ,
398405 @ NotNull Phaser debugPhaser ,
399406 String remoteHost ,
400407 int remotePort ,
@@ -460,6 +467,7 @@ private void createDebugProcess(SparkBatchRemoteDebugJob remoteDebugJob,
460467 newExecutorState ,
461468 false ,
462469 debugSessionSubscriber ,
470+ schedulers ,
463471 debugPhaser ,
464472 host ,
465473 executorJdbPort ,
@@ -518,7 +526,6 @@ public void processTerminated(ProcessEvent processEvent) {
518526 ob .onError (e );
519527 }
520528 })
521- .subscribeOn (Schedulers .io ())
522529 .subscribe (debugProcessConsole ::onNext , debugSessionSubscriber ::onError , debugPhaser ::arriveAndDeregister );
523530 }
524531
0 commit comments