Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2fe1633

Browse files
liyinan926Marcelo Vanzin
authored andcommitted
[SPARK-22778][KUBERNETES] Added the missing service metadata for KubernetesClusterManager
## What changes were proposed in this pull request? This PR added the missing service metadata for `KubernetesClusterManager`. Without the metadata, the service loader couldn't load `KubernetesClusterManager`, and caused the driver to fail to create a `ExternalClusterManager`, as being reported in SPARK-22778. The PR also changed the `k8s:` prefix used to `k8s://`, which is what existing Spark on k8s users are familiar and used to. ## How was this patch tested? Manual testing verified that the fix resolved the issue in SPARK-22778. /cc vanzin felixcheung jiangxb1987 Author: Yinan Li <[email protected]> Closes apache#19972 from liyinan926/fix-22778.
1 parent 6d99940 commit 2fe1633

File tree

5 files changed

+11
-10
lines changed

5 files changed

+11
-10
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2757,7 +2757,7 @@ private[spark] object Utils extends Logging {
27572757

27582758
/**
27592759
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
2760-
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
2760+
* "k8s://" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
27612761
* in canCreate to determine if the KubernetesClusterManager should be used.
27622762
*/
27632763
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
@@ -2770,7 +2770,7 @@ private[spark] object Utils extends Logging {
27702770
val resolvedURL = s"https://$masterWithoutK8sPrefix"
27712771
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
27722772
s"URL is $resolvedURL.")
2773-
return s"k8s:$resolvedURL"
2773+
return s"k8s://$resolvedURL"
27742774
}
27752775

27762776
val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
@@ -2789,7 +2789,7 @@ private[spark] object Utils extends Logging {
27892789
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
27902790
}
27912791

2792-
return s"k8s:$resolvedURL"
2792+
s"k8s://$resolvedURL"
27932793
}
27942794
}
27952795

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ class SparkSubmitSuite
408408
childArgsMap.get("--arg") should be (Some("arg1"))
409409
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
410410
classpath should have length (0)
411-
conf.get("spark.master") should be ("k8s:https://host:port")
411+
conf.get("spark.master") should be ("k8s://https://host:port")
412412
conf.get("spark.executor.memory") should be ("5g")
413413
conf.get("spark.driver.memory") should be ("4g")
414414
conf.get("spark.kubernetes.namespace") should be ("spark")

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,16 +1148,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
11481148

11491149
test("check Kubernetes master URL") {
11501150
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
1151-
assert(k8sMasterURLHttps === "k8s:https://host:port")
1151+
assert(k8sMasterURLHttps === "k8s://https://host:port")
11521152

11531153
val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
1154-
assert(k8sMasterURLHttp === "k8s:http://host:port")
1154+
assert(k8sMasterURLHttp === "k8s://http://host:port")
11551155

11561156
val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
1157-
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")
1157+
assert(k8sMasterURLWithoutScheme === "k8s://https://127.0.0.1:8443")
11581158

11591159
val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
1160-
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")
1160+
assert(k8sMasterURLWithoutScheme2 === "k8s://https://127.0.0.1")
11611161

11621162
intercept[IllegalArgumentException] {
11631163
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
203203
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
204204
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
205205
// The master URL has been checked for validity already in SparkSubmit.
206-
// We just need to get rid of the "k8s:" prefix here.
207-
val master = sparkConf.get("spark.master").substring("k8s:".length)
206+
// We just need to get rid of the "k8s://" prefix here.
207+
val master = sparkConf.get("spark.master").substring("k8s://".length)
208208
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
209209

210210
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(

0 commit comments

Comments
 (0)