|
35 | 35 | import com.intellij.execution.runners.ExecutionEnvironmentBuilder; |
36 | 36 | import com.intellij.execution.ui.ConsoleViewContentType; |
37 | 37 | import com.intellij.execution.ui.RunContentDescriptor; |
| 38 | +import com.intellij.openapi.application.ModalityState; |
38 | 39 | import com.intellij.openapi.project.Project; |
39 | 40 | import com.intellij.openapi.util.Key; |
40 | 41 | import com.microsoft.azure.hdinsight.common.ClusterManagerEx; |
|
55 | 56 | import org.jetbrains.concurrency.AsyncPromise; |
56 | 57 | import org.jetbrains.concurrency.Promise; |
57 | 58 | import rx.Observable; |
58 | | -import rx.Observer; |
59 | 59 | import rx.Subscription; |
60 | 60 | import rx.schedulers.Schedulers; |
61 | 61 | import rx.subjects.PublishSubject; |
@@ -232,6 +232,18 @@ protected void execute(ExecutionEnvironment environment, Callback callback, RunP |
232 | 232 | jdbReadyEvent.getRemoteHost().orElse("unknown"), |
233 | 233 | jdbReadyEvent.isDriver()); |
234 | 234 |
|
| 235 | + final RunProfile runProfile = forkEnv.getRunProfile(); |
| 236 | + if (!(runProfile instanceof LivySparkBatchJobRunConfiguration)) { |
| 237 | + ctrlSubject.onError(new UnsupportedOperationException( |
| 238 | + "Only supports LivySparkBatchJobRunConfiguration type, but got type" |
| 239 | + + runProfile.getClass().getCanonicalName())); |
| 240 | + |
| 241 | + return; |
| 242 | + } |
| 243 | + |
| 244 | + // Reuse the driver's Spark batch job |
| 245 | + ((LivySparkBatchJobRunConfiguration) runProfile).setSparkRemoteBatch(sparkDebugBatch); |
| 246 | + |
235 | 247 | SparkBatchRemoteDebugState forkState = jdbReadyEvent.isDriver() ? |
236 | 248 | submissionState : |
237 | 249 | (SparkBatchRemoteDebugState) forkEnv.getState(); |
@@ -325,7 +337,9 @@ public Promise<RunContentDescriptor> executeAsync(@NotNull RunProfileState state |
325 | 337 |
|
326 | 338 | return jobDriverEnvReady |
327 | 339 | .then(forkEnv -> Observable.fromCallable(() -> doExecute(state, forkEnv)) |
328 | | - .subscribeOn(schedulers.dispatchUIThread()).toBlocking().singleOrDefault(null)) |
| 340 | + .subscribeOn(schedulers.dispatchUIThread(ModalityState.defaultModalityState())) |
| 341 | + .toBlocking() |
| 342 | + .singleOrDefault(null)) |
329 | 343 | .then(descriptor -> { |
330 | 344 | // Borrow BaseProgramRunner.postProcess() codes since it's only package public accessible. |
331 | 345 | if (descriptor != null) { |
|
0 commit comments