@@ -68,9 +68,6 @@ public class SparkBatchJobRemoteProcess extends RemoteProcess {
6868
6969 private boolean isDisconnected ;
7070
71- @ Nullable
72- private Subscription jobLogSubscription ;
73-
7471 public SparkBatchJobRemoteProcess (@ NotNull Project project , @ NotNull SparkSubmitModel sparkSubmitModel ,
7572 @ NotNull PublishSubject <SimpleImmutableEntry <MessageInfoType , String >> ctrlSubject )
7673 throws ExecutionException {
@@ -162,50 +159,17 @@ public void start() {
162159 submitModel .getSubmissionParameter ().getClusterName (),
163160 ctrlSubject )
164161 .subscribeOn (IdeaSchedulers .processBarVisibleAsync (project , "Deploy the jar file into cluster" )))
165- .flatMap (clusterArtifactUriPair -> {
166- IClusterDetail cluster = clusterArtifactUriPair .getKey ();
167- submitModel .getSubmissionParameter ().setFilePath (clusterArtifactUriPair .getValue ());
168- return JobUtils .submit (cluster , submitModel .getSubmissionParameter ())
169- .subscribeOn (IdeaSchedulers .processBarVisibleAsync (project , "Submit the Spark batch job" ));
170- })
171- .doOnSuccess (job -> {
172- getEventSubject ().onNext (new SparkBatchJobSubmissionEvent (
173- SparkBatchJobSubmissionEvent .Type .SUBMITTED , job ));
174-
175- jobLogSubscription = job .getSubmissionLog ()
176- .subscribeOn (Schedulers .io ())
177- .subscribe (ctrlSubject ::onNext , ctrlSubject ::onError );
178- })
179162 .toObservable ()
180- .flatMap (job -> Observable
181- .create ((Subscriber <? super SparkBatchJob > ob ) -> {
182- try {
183- jobStderrLogInputSteam .attachJob (job );
184- jobStdoutLogInputSteam .attachJob (job );
185-
186- sparkJob = job ;
187-
188- ob .onNext (job );
189- ob .onCompleted ();
190- } catch (IOException e ) {
191- ob .onError (e );
192- }
193- })
194- .retryWhen (attempts -> attempts .flatMap (err -> {
195- try {
196- final String state = job .getState ();
197-
198- if (state .equals ("starting" ) || state .equals ("not_started" ) || state .equals ("running" )) {
199- logInfo ("Job is waiting for start due to cluster busy, please wait or disconnect (The job will run when the cluster is free)." );
200-
201- return Observable .timer (5 , TimeUnit .SECONDS );
202- }
203- } catch (IOException ignored ) {
204- }
205-
206- return Observable .error (new SparkJobException ("Spark Job Service not available, please check HDInsight cluster status." , err ));
207- })))
208- .flatMap (runningJob -> runningJob .getJobDoneObservable ().subscribeOn (Schedulers .io ()))
163+ .flatMap (this ::submitJob )
164+ .flatMap (job -> Observable .zip (
165+ attachJobInputStream (jobStderrLogInputSteam , job ),
166+ attachJobInputStream (jobStdoutLogInputSteam , job ),
167+ (job1 , job2 ) -> {
168+ sparkJob = job ;
169+ return job ;
170+ }))
171+ .flatMap (runningJob -> runningJob .getJobDoneObservable ()
172+ .subscribeOn (IdeaSchedulers .processBarVisibleAsync (project , "Spark batch job is running" )))
209173 .subscribe (sdPair -> {
210174 if (sdPair .getKey () == SparkBatchJobState .SUCCESS ) {
211175 logInfo ("Job run successfully." );
@@ -221,9 +185,28 @@ public void start() {
221185 });
222186 }
223187
188+ private Observable <SparkBatchJob > attachJobInputStream (SparkJobLogInputStream inputStream , SparkBatchJob job ) {
189+ return Observable .just (inputStream )
190+ .map (stream -> stream .attachJob (job ))
191+ .subscribeOn (IdeaSchedulers .processBarVisibleAsync (project , "Attach Spark batch job outputs " + inputStream .getLogType ()))
192+ .retryWhen (attempts -> attempts .flatMap (err -> {
193+ try {
194+ final String state = job .getState ();
195+
196+ if (state .equals ("starting" ) || state .equals ("not_started" ) || state .equals ("running" )) {
197+ logInfo ("Job is waiting for start due to cluster busy, please wait or disconnect (The job will run when the cluster is free)." );
198+
199+ return Observable .timer (5 , TimeUnit .SECONDS );
200+ }
201+ } catch (IOException ignored ) {
202+ }
203+
204+ return Observable .error (new SparkJobException ("Spark Job Service not available, please check HDInsight cluster status." , err ));
205+ }));
206+ }
207+
224208 public void disconnect () {
225209 this .isDisconnected = true ;
226- Optional .ofNullable (this .jobLogSubscription ).ifPresent (Subscription ::unsubscribe );
227210
228211 this .ctrlSubject .onCompleted ();
229212 this .eventSubject .onCompleted ();
@@ -239,4 +222,24 @@ private void logInfo(String message) {
239222 public PublishSubject <SparkBatchJobSubmissionEvent > getEventSubject () {
240223 return eventSubject ;
241224 }
225+
226+ private Observable <SparkBatchJob > startJobSubmissionLogReceiver (SparkBatchJob job ) {
227+ getEventSubject ().onNext (new SparkBatchJobSubmissionEvent (SparkBatchJobSubmissionEvent .Type .SUBMITTED , job ));
228+
229+ return job .getSubmissionLog ()
230+ .doOnNext (ctrlSubject ::onNext )
231+ .doOnError (ctrlSubject ::onError )
232+ .last ()
233+ .map (messageTypeText -> job );
234+
235+ }
236+
237+ private Observable <SparkBatchJob > submitJob (SimpleImmutableEntry <IClusterDetail , String > clusterArtifactUriPair ) {
238+ IClusterDetail cluster = clusterArtifactUriPair .getKey ();
239+ submitModel .getSubmissionParameter ().setFilePath (clusterArtifactUriPair .getValue ());
240+ return JobUtils .submit (cluster , submitModel .getSubmissionParameter ())
241+ .subscribeOn (IdeaSchedulers .processBarVisibleAsync (project , "Submit the Spark batch job" ))
242+ .toObservable ()
243+ .flatMap (this ::startJobSubmissionLogReceiver ); // To receive the Livy submission log
244+ }
242245}
0 commit comments