Skip to content

Commit 92e8291

Browse files
authored
Fix cluster not exist for spark on ADL regression bug (#2197)
1 parent 5fb78cf commit 92e8291

File tree

4 files changed

+52
-11
lines changed

4 files changed

+52
-11
lines changed

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/ui/ServerlessSparkSubmissionPanelConfigurable.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,30 @@ import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkServe
3030
import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkServerlessClusterManager
3131
import com.microsoft.azure.hdinsight.spark.common.ServerlessSparkSubmitModel
3232
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitModel
33+
import org.apache.commons.lang3.StringUtils
3334
import rx.Observable
35+
import java.util.stream.Collectors
3436

3537
class ServerlessSparkSubmissionPanelConfigurable(project: Project, submissionPanel: SparkSubmissionContentPanel)
3638
: SparkSubmissionContentPanelConfigurable(project, submissionPanel), ILogger {
39+
40+
init {
41+
jobUploadStorageCtrl = object : SparkSubmissionJobUploadStorageCtrl(storageWithUploadPathPanel) {
42+
override fun getClusterName(): String? = selectedClusterDetail?.name
43+
44+
override fun getClusterDetail(): IClusterDetail? {
45+
if (StringUtils.isEmpty(getClusterName())) {
46+
return null
47+
}
48+
return AzureSparkServerlessClusterManager.getInstance().clusters.stream()
49+
.filter { clusterDetail -> clusterDetail.name == getClusterName() }
50+
.collect(Collectors.toList())
51+
.getOrNull(0)
52+
}
53+
}
54+
55+
}
56+
3757
override fun getType(): String = "Azure Data Lake Spark Pool"
3858

3959
override fun getClusterDetails(): ImmutableSortedSet<out IClusterDetail> {

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/ui/SparkSubmissionContentPanelConfigurable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public class SparkSubmissionContentPanelConfigurable implements SettableControl<
6666
private final Project myProject;
6767

6868
private SparkSubmissionContentPanel submissionPanel;
69-
private SparkSubmissionJobUploadStorageWithUploadPathPanel storageWithUploadPathPanel;
70-
private SparkSubmissionJobUploadStorageCtrl jobUploadStorageCtrl;
69+
protected SparkSubmissionJobUploadStorageWithUploadPathPanel storageWithUploadPathPanel;
70+
protected SparkSubmissionJobUploadStorageCtrl jobUploadStorageCtrl;
7171
private JPanel myWholePanel;
7272

7373
// Cluster refresh publish subject with preselected cluster name as event
@@ -79,6 +79,7 @@ public SparkSubmissionContentPanelConfigurable(@NotNull Project project, @NotNul
7979
this.storageWithUploadPathPanel = submissionPanel.storageWithUploadPathPanel;
8080
this.myProject = project;
8181

82+
/*
8283
this.jobUploadStorageCtrl = new SparkSubmissionJobUploadStorageCtrl(this.storageWithUploadPathPanel) {
8384
@Nullable
8485
@Override
@@ -87,6 +88,7 @@ public String getClusterName() {
8788
return clusterDetail == null ? null : clusterDetail.getName();
8889
}
8990
};
91+
*/
9092
this.clustersRefreshSub = BehaviorSubject.create();
9193
}
9294

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/ui/SparkSubmissionDebuggablePanelConfigurable.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package com.microsoft.azure.hdinsight.spark.ui
2424

2525
import com.intellij.execution.configurations.RuntimeConfigurationError
2626
import com.intellij.openapi.project.Project
27+
import com.microsoft.azure.hdinsight.common.ClusterManagerEx
2728
import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail
2829
import com.microsoft.azure.hdinsight.spark.common.SparkBatchRemoteDebugJobSshAuth.SSHAuthType.UsePassword
2930
import com.microsoft.azure.hdinsight.spark.common.SparkBatchRemoteDebugJobSshAuth.SSHAuthType.UseKeyFile
@@ -43,6 +44,19 @@ class SparkSubmissionDebuggablePanelConfigurable(project: Project,
4344
override fun getClusterNameToCheck(): String? = selectedClusterDetail?.name
4445
}
4546

47+
init {
48+
jobUploadStorageCtrl = object : SparkSubmissionJobUploadStorageCtrl(storageWithUploadPathPanel) {
49+
override fun getClusterName(): String? = selectedClusterDetail?.name
50+
51+
override fun getClusterDetail(): IClusterDetail? {
52+
if (StringUtils.isEmpty(getClusterName())) {
53+
return null
54+
}
55+
return ClusterManagerEx.getInstance().getClusterDetailByName(getClusterName()).orElse(null)
56+
}
57+
}
58+
}
59+
4660
override fun onClusterSelected(cluster: IClusterDetail) {
4761
super.onClusterSelected(cluster)
4862

PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/ui/SparkSubmissionJobUploadStorageCtrl.kt

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ package com.microsoft.azure.hdinsight.spark.ui
3030
import com.microsoft.azure.hdinsight.common.ClusterManagerEx
3131
import com.microsoft.azure.hdinsight.common.logger.ILogger
3232
import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail
33+
import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkServerlessCluster
3334
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitJobUploadStorageModel
3435
import com.microsoft.azure.hdinsight.spark.common.SparkSubmitStorageType
3536
import com.microsoft.tooling.msservices.helpers.azure.sdk.StorageClientSDKManager
@@ -104,12 +105,7 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
104105
{ },
105106
{ err -> log().warn(ExceptionUtils.getStackTrace(err)) })
106107

107-
private fun getClusterDetail(): IClusterDetail? {
108-
if (StringUtils.isEmpty(getClusterName())) {
109-
return null
110-
}
111-
return ClusterManagerEx.getInstance().getClusterDetailByName(getClusterName()).orElse(null)
112-
}
108+
abstract fun getClusterDetail(): IClusterDetail?
113109

114110
private fun validateStorageInfo(selectedItem: String): Observable<SparkSubmitJobUploadStorageModel> {
115111
return Observable.just(SparkSubmitJobUploadStorageModel())
@@ -136,10 +132,10 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
136132
try {
137133
clusterDetail.getConfigurationInfo()
138134
val defaultStorageAccount = clusterDetail.storageAccount
139-
val defaultStorageContainer = defaultStorageAccount.defaultContainerOrRootPath
140-
if (defaultStorageAccount != null && defaultStorageContainer != null) {
135+
val defaultContainerOrRootPath = defaultStorageAccount.defaultContainerOrRootPath
136+
if (defaultStorageAccount != null && defaultContainerOrRootPath != null) {
141137
errorMsg = null
142-
uploadPath = getAzureBlobStoragePath(defaultStorageAccount.name, defaultStorageContainer)
138+
uploadPath = if (clusterDetail is AzureSparkServerlessCluster) getAzureDataLakeStoragePath(defaultContainerOrRootPath) else getAzureBlobStoragePath(defaultStorageAccount.name, defaultContainerOrRootPath)
143139
storageAccountType = SparkSubmitStorageType.DEFAULT_STORAGE_ACCOUNT
144140
} else {
145141
errorMsg = "Cluster have no storage account or storage container"
@@ -241,4 +237,13 @@ abstract class SparkSubmissionJobUploadStorageCtrl(val view: SparkSubmissionJobU
241237
return if (StringUtils.isEmpty(storageAccountName) || StringUtils.isEmpty(container)) null else
242238
"wasbs://$container@${ClusterManagerEx.getInstance().getBlobFullName(storageAccountName)}/SparkSubmission/"
243239
}
240+
241+
private fun getRootPathEndsWithSlash(rootPath: String): String {
242+
return if (rootPath.endsWith("/")) rootPath else "$rootPath/"
243+
}
244+
245+
private fun getAzureDataLakeStoragePath(rootPath: String?): String? {
246+
return if (StringUtils.isEmpty(rootPath)) null else
247+
"${getRootPathEndsWithSlash(rootPath.orEmpty())}SparkSubmission/"
248+
}
244249
}

0 commit comments

Comments
 (0)