@@ -22,12 +22,12 @@ import java.util.UUID
22
22
import java .util .regex .Pattern
23
23
24
24
import com .google .common .io .PatternFilenameFilter
25
+ import io .fabric8 .kubernetes .api .model .Pod
25
26
import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll , FunSuite }
26
27
import org .scalatest .concurrent .{Eventually , PatienceConfiguration }
27
28
import org .scalatest .time .{Minutes , Seconds , Span }
28
29
29
30
import org .apache .spark .deploy .k8s .integrationtest .backend .IntegrationTestBackendFactory
30
- import org .apache .spark .deploy .k8s .integrationtest .constants .MINIKUBE_TEST_BACKEND
31
31
import org .apache .spark .deploy .k8s .integrationtest .constants .SPARK_DISTRO_PATH
32
32
33
33
private [spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
@@ -66,20 +66,56 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
66
66
runSparkPiAndVerifyCompletion()
67
67
}
68
68
69
+ test(" Run SparkPi with a master URL without a scheme." ) {
70
+ val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
71
+ sparkAppConf.set(" spark.master" , s " k8s:// ${url.getHost}: ${url.getPort}" )
72
+ runSparkPiAndVerifyCompletion()
73
+ }
74
+
75
+ test(" Run SparkPi with custom driver pod name, labels, annotations, and environment variables." ) {
76
+ sparkAppConf
77
+ .set(" spark.kubernetes.driver.pod.name" , " spark-integration-spark-pi" )
78
+ .set(" spark.kubernetes.driver.label.label1" , " label1-value" )
79
+ .set(" spark.kubernetes.driver.label.label2" , " label2-value" )
80
+ .set(" spark.kubernetes.driver.annotation.annotation1" , " annotation1-value" )
81
+ .set(" spark.kubernetes.driver.annotation.annotation2" , " annotation2-value" )
82
+ .set(" spark.kubernetes.driverEnv.ENV1" , " VALUE1" )
83
+ .set(" spark.kubernetes.driverEnv.ENV2" , " VALUE2" )
84
+ runSparkPiAndVerifyCompletion(driverPodChecker = (driverPod : Pod ) => {
85
+ doBasicDriverPodCheck(driverPod)
86
+ assert(driverPod.getMetadata.getName === " spark-integration-spark-pi" )
87
+
88
+ assert(driverPod.getMetadata.getLabels.get(" label1" ) === " label1-value" )
89
+ assert(driverPod.getMetadata.getLabels.get(" label2" ) === " label2-value" )
90
+ assert(driverPod.getMetadata.getAnnotations.get(" annotation1" ) === " annotation1-value" )
91
+ assert(driverPod.getMetadata.getAnnotations.get(" annotation2" ) === " annotation2-value" )
92
+
93
+ val driverContainer = driverPod.getSpec.getContainers.get(0 )
94
+ assert(driverContainer.getEnv.size() == 2 )
95
+ assert(driverContainer.getEnv.get(0 ).getName === " ENV1" )
96
+ assert(driverContainer.getEnv.get(0 ).getValue === " VALUE1" )
97
+ assert(driverContainer.getEnv.get(1 ).getName === " ENV2" )
98
+ assert(driverContainer.getEnv.get(1 ).getValue === " VALUE2" )
99
+ })
100
+ }
101
+
69
102
private def runSparkPiAndVerifyCompletion (
70
- appResource : String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR ): Unit = {
103
+ appResource : String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR ,
104
+ driverPodChecker : Pod => Unit = doBasicDriverPodCheck): Unit = {
71
105
runSparkApplicationAndVerifyCompletion(
72
- appResource,
73
- SPARK_PI_MAIN_CLASS ,
74
- Seq (" Pi is roughly 3" ),
75
- Array .empty[String ])
106
+ appResource,
107
+ SPARK_PI_MAIN_CLASS ,
108
+ Seq (" Pi is roughly 3" ),
109
+ Array .empty[String ],
110
+ driverPodChecker)
76
111
}
77
112
78
113
private def runSparkApplicationAndVerifyCompletion (
79
114
appResource : String ,
80
115
mainClass : String ,
81
116
expectedLogOnCompletion : Seq [String ],
82
- appArgs : Array [String ]): Unit = {
117
+ appArgs : Array [String ],
118
+ driverPodChecker : Pod => Unit ): Unit = {
83
119
val appArguments = SparkAppArguments (
84
120
mainAppResource = appResource,
85
121
mainClass = mainClass)
@@ -90,6 +126,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
90
126
.list()
91
127
.getItems
92
128
.get(0 )
129
+ driverPodChecker(driverPod)
93
130
Eventually .eventually(TIMEOUT , INTERVAL ) {
94
131
expectedLogOnCompletion.foreach { e =>
95
132
assert(kubernetesTestComponents.kubernetesClient
@@ -100,6 +137,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
100
137
}
101
138
}
102
139
}
140
+
141
+ private def doBasicDriverPodCheck (driverPod : Pod ): Unit = {
142
+ assert(driverPod.getMetadata.getLabels.get(" spark-role" ) === " driver" )
143
+ }
103
144
}
104
145
105
146
private [spark] object KubernetesSuite {
0 commit comments