@@ -24,7 +24,7 @@ import java.util.regex.Pattern
24
24
import scala .collection .JavaConverters ._
25
25
26
26
import com .google .common .io .PatternFilenameFilter
27
- import io .fabric8 .kubernetes .api .model .Pod
27
+ import io .fabric8 .kubernetes .api .model .{ Container , Pod }
28
28
import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll , FunSuite }
29
29
import org .scalatest .concurrent .{Eventually , PatienceConfiguration }
30
30
import org .scalatest .time .{Minutes , Seconds , Span }
@@ -52,6 +52,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
52
52
before {
53
53
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
54
54
.set(" spark.kubernetes.driver.label.spark-app-locator" , APP_LOCATOR_LABEL )
55
+ .set(" spark.kubernetes.executor.label.spark-app-locator" , APP_LOCATOR_LABEL )
55
56
kubernetesTestComponents.createNamespace()
56
57
}
57
58
@@ -70,10 +71,25 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
70
71
71
72
test(" Run SparkPi with a master URL without a scheme." ) {
72
73
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
73
- sparkAppConf.set(" spark.master" , s " k8s:// ${url.getHost}: ${url.getPort}" )
74
+ val k8sMasterUrl = if (url.getPort < 0 ) {
75
+ s " k8s:// ${url.getHost}"
76
+ } else {
77
+ s " k8s:// ${url.getHost}: ${url.getPort}"
78
+ }
79
+ sparkAppConf.set(" spark.master" , k8sMasterUrl)
74
80
runSparkPiAndVerifyCompletion()
75
81
}
76
82
83
+ test(" Run SparkPi with an argument." ) {
84
+ runSparkPiAndVerifyCompletion(appArgs = Array (" 5" ))
85
+ }
86
+
87
+ test(" Run SparkPi using the remote example jar." ) {
88
+ sparkAppConf.set(" spark.kubernetes.initContainer.image" ,
89
+ System .getProperty(" spark.docker.test.initContainerImage" , " spark-init:latest" ))
90
+ runSparkPiAndVerifyCompletion(appResource = REMOTE_EXAMPLES_JAR_URI )
91
+ }
92
+
77
93
test(" Run SparkPi with custom driver pod name, labels, annotations, and environment variables." ) {
78
94
sparkAppConf
79
95
.set(" spark.kubernetes.driver.pod.name" , " spark-integration-spark-pi" )
@@ -83,56 +99,109 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
83
99
.set(" spark.kubernetes.driver.annotation.annotation2" , " annotation2-value" )
84
100
.set(" spark.kubernetes.driverEnv.ENV1" , " VALUE1" )
85
101
.set(" spark.kubernetes.driverEnv.ENV2" , " VALUE2" )
86
- runSparkPiAndVerifyCompletion(driverPodChecker = (driverPod : Pod ) => {
87
- doBasicDriverPodCheck(driverPod)
88
- assert(driverPod.getMetadata.getName === " spark-integration-spark-pi" )
89
-
90
- assert(driverPod.getMetadata.getLabels.get(" label1" ) === " label1-value" )
91
- assert(driverPod.getMetadata.getLabels.get(" label2" ) === " label2-value" )
92
- assert(driverPod.getMetadata.getAnnotations.get(" annotation1" ) === " annotation1-value" )
93
- assert(driverPod.getMetadata.getAnnotations.get(" annotation2" ) === " annotation2-value" )
94
-
95
- val driverContainer = driverPod.getSpec.getContainers.get(0 )
96
- val envVars = driverContainer
97
- .getEnv
98
- .asScala
99
- .map { env =>
100
- (env.getName, env.getValue)
101
- }
102
- .toMap
103
- assert(envVars(" ENV1" ) === " VALUE1" )
104
- assert(envVars(" ENV2" ) === " VALUE2" )
105
- })
102
+ .set(" spark.kubernetes.executor.label.label1" , " label1-value" )
103
+ .set(" spark.kubernetes.executor.label.label2" , " label2-value" )
104
+ .set(" spark.kubernetes.executor.annotation.annotation1" , " annotation1-value" )
105
+ .set(" spark.kubernetes.executor.annotation.annotation2" , " annotation2-value" )
106
+ .set(" spark.executorEnv.ENV1" , " VALUE1" )
107
+ .set(" spark.executorEnv.ENV2" , " VALUE2" )
108
+
109
+ runSparkPiAndVerifyCompletion(
110
+ driverPodChecker = (driverPod : Pod ) => {
111
+ doBasicDriverPodCheck(driverPod)
112
+ assert(driverPod.getMetadata.getName === " spark-integration-spark-pi" )
113
+ checkCustomSettings(driverPod)
114
+ },
115
+ executorPodChecker = (executorPod : Pod ) => {
116
+ doBasicExecutorPodCheck(executorPod)
117
+ checkCustomSettings(executorPod)
118
+ })
119
+ }
120
+
121
+ test(" Run SparkPi with a test secret mounted into the driver and executor pods" ) {
122
+ createTestSecret()
123
+ sparkAppConf
124
+ .set(s " spark.kubernetes.driver.secrets. $TEST_SECRET_NAME" , TEST_SECRET_MOUNT_PATH )
125
+ .set(s " spark.kubernetes.executor.secrets. $TEST_SECRET_NAME" , TEST_SECRET_MOUNT_PATH )
126
+ runSparkPiAndVerifyCompletion(
127
+ driverPodChecker = (driverPod : Pod ) => {
128
+ doBasicDriverPodCheck(driverPod)
129
+ checkTestSecret(driverPod)
130
+ },
131
+ executorPodChecker = (executorPod : Pod ) => {
132
+ doBasicExecutorPodCheck(executorPod)
133
+ checkTestSecret(executorPod)
134
+ })
135
+ }
136
+
137
+ test(" Run SparkPi using the remote example jar with a test secret mounted into the driver and " +
138
+ " executor pods" ) {
139
+ sparkAppConf
140
+ .set(s " spark.kubernetes.driver.secrets. $TEST_SECRET_NAME" , TEST_SECRET_MOUNT_PATH )
141
+ .set(s " spark.kubernetes.executor.secrets. $TEST_SECRET_NAME" , TEST_SECRET_MOUNT_PATH )
142
+ sparkAppConf.set(" spark.kubernetes.initContainer.image" ,
143
+ System .getProperty(" spark.docker.test.initContainerImage" , " spark-init:latest" ))
144
+
145
+ createTestSecret()
146
+
147
+ runSparkPiAndVerifyCompletion(
148
+ appResource = REMOTE_EXAMPLES_JAR_URI ,
149
+ driverPodChecker = (driverPod : Pod ) => {
150
+ doBasicDriverPodCheck(driverPod)
151
+ checkTestSecret(driverPod, withInitContainer = true )
152
+ },
153
+ executorPodChecker = (executorPod : Pod ) => {
154
+ doBasicExecutorPodCheck(executorPod)
155
+ checkTestSecret(executorPod, withInitContainer = true )
156
+ })
106
157
}
107
158
108
159
private def runSparkPiAndVerifyCompletion (
109
160
appResource : String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR ,
110
- driverPodChecker : Pod => Unit = doBasicDriverPodCheck): Unit = {
161
+ driverPodChecker : Pod => Unit = doBasicDriverPodCheck,
162
+ executorPodChecker : Pod => Unit = doBasicExecutorPodCheck,
163
+ appArgs : Array [String ] = Array .empty[String ]): Unit = {
111
164
runSparkApplicationAndVerifyCompletion(
112
165
appResource,
113
166
SPARK_PI_MAIN_CLASS ,
114
167
Seq (" Pi is roughly 3" ),
115
- Array .empty[String ],
116
- driverPodChecker)
168
+ appArgs,
169
+ driverPodChecker,
170
+ executorPodChecker)
117
171
}
118
172
119
173
private def runSparkApplicationAndVerifyCompletion (
120
174
appResource : String ,
121
175
mainClass : String ,
122
176
expectedLogOnCompletion : Seq [String ],
123
177
appArgs : Array [String ],
124
- driverPodChecker : Pod => Unit ): Unit = {
178
+ driverPodChecker : Pod => Unit ,
179
+ executorPodChecker : Pod => Unit ): Unit = {
125
180
val appArguments = SparkAppArguments (
126
181
mainAppResource = appResource,
127
- mainClass = mainClass)
182
+ mainClass = mainClass,
183
+ appArgs = appArgs)
128
184
SparkAppLauncher .launch(appArguments, sparkAppConf, TIMEOUT .value.toSeconds.toInt)
185
+
129
186
val driverPod = kubernetesTestComponents.kubernetesClient
130
187
.pods()
131
188
.withLabel(" spark-app-locator" , APP_LOCATOR_LABEL )
189
+ .withLabel(" spark-role" , " driver" )
132
190
.list()
133
191
.getItems
134
192
.get(0 )
135
193
driverPodChecker(driverPod)
194
+
195
+ val executorPods = kubernetesTestComponents.kubernetesClient
196
+ .pods()
197
+ .withLabel(" spark-app-locator" , APP_LOCATOR_LABEL )
198
+ .withLabel(" spark-role" , " executor" )
199
+ .list()
200
+ .getItems
201
+ executorPods.asScala.foreach { pod =>
202
+ executorPodChecker(pod)
203
+ }
204
+
136
205
Eventually .eventually(TIMEOUT , INTERVAL ) {
137
206
expectedLogOnCompletion.foreach { e =>
138
207
assert(kubernetesTestComponents.kubernetesClient
@@ -145,7 +214,64 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
145
214
}
146
215
147
216
private def doBasicDriverPodCheck (driverPod : Pod ): Unit = {
148
- assert(driverPod.getMetadata.getLabels.get(" spark-role" ) === " driver" )
217
+ assert(driverPod.getSpec.getContainers.get(0 ).getImage === " spark-driver:latest" )
218
+ assert(driverPod.getSpec.getContainers.get(0 ).getName === " spark-kubernetes-driver" )
219
+ }
220
+
221
+ private def doBasicExecutorPodCheck (executorPod : Pod ): Unit = {
222
+ assert(executorPod.getSpec.getContainers.get(0 ).getImage === " spark-executor:latest" )
223
+ assert(executorPod.getSpec.getContainers.get(0 ).getName === " executor" )
224
+ }
225
+
226
+ private def checkCustomSettings (pod : Pod ): Unit = {
227
+ assert(pod.getMetadata.getLabels.get(" label1" ) === " label1-value" )
228
+ assert(pod.getMetadata.getLabels.get(" label2" ) === " label2-value" )
229
+ assert(pod.getMetadata.getAnnotations.get(" annotation1" ) === " annotation1-value" )
230
+ assert(pod.getMetadata.getAnnotations.get(" annotation2" ) === " annotation2-value" )
231
+
232
+ val container = pod.getSpec.getContainers.get(0 )
233
+ val envVars = container
234
+ .getEnv
235
+ .asScala
236
+ .map { env =>
237
+ (env.getName, env.getValue)
238
+ }
239
+ .toMap
240
+ assert(envVars(" ENV1" ) === " VALUE1" )
241
+ assert(envVars(" ENV2" ) === " VALUE2" )
242
+ }
243
+
244
+ private def createTestSecret (): Unit = {
245
+ testBackend.getKubernetesClient.secrets
246
+ .createNew()
247
+ .editOrNewMetadata()
248
+ .withName(TEST_SECRET_NAME )
249
+ .withNamespace(kubernetesTestComponents.namespace)
250
+ .endMetadata()
251
+ .addToStringData(TEST_SECRET_KEY , TEST_SECRET_VALUE )
252
+ .done()
253
+ }
254
+
255
+ private def checkTestSecret (pod : Pod , withInitContainer : Boolean = false ): Unit = {
256
+ val testSecretVolume = pod.getSpec.getVolumes.asScala.filter { volume =>
257
+ volume.getName == s " $TEST_SECRET_NAME-volume "
258
+ }
259
+ assert(testSecretVolume.size === 1 )
260
+ assert(testSecretVolume.head.getSecret.getSecretName === TEST_SECRET_NAME )
261
+
262
+ checkTestSecretInContainer(pod.getSpec.getContainers.get(0 ))
263
+
264
+ if (withInitContainer) {
265
+ checkTestSecretInContainer(pod.getSpec.getInitContainers.get(0 ))
266
+ }
267
+ }
268
+
269
+ private def checkTestSecretInContainer (container : Container ): Unit = {
270
+ val testSecret = container.getVolumeMounts.asScala.filter { mount =>
271
+ mount.getName == s " $TEST_SECRET_NAME-volume "
272
+ }
273
+ assert(testSecret.size === 1 )
274
+ assert(testSecret.head.getMountPath === TEST_SECRET_MOUNT_PATH )
149
275
}
150
276
}
151
277
@@ -161,5 +287,13 @@ private[spark] object KubernetesSuite {
161
287
s " ${SPARK_DISTRO_EXAMPLES_JAR_FILE .getName}"
162
288
val SPARK_PI_MAIN_CLASS : String = " org.apache.spark.examples.SparkPi"
163
289
290
+ val TEST_SECRET_NAME = " test-secret"
291
+ val TEST_SECRET_KEY = " test-key"
292
+ val TEST_SECRET_VALUE = " test-data"
293
+ val TEST_SECRET_MOUNT_PATH = " /etc/secrets"
294
+
295
+ val REMOTE_EXAMPLES_JAR_URI =
296
+ " https://storage.googleapis.com/spark-k8s-integration-tests/jars/spark-examples_2.11-2.3.0.jar"
297
+
164
298
case object ShuffleNotReadyException extends Exception
165
299
}
0 commit comments