@@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
32
32
import org .apache .spark .deploy .k8s .SSLUtils
33
33
import org .apache .spark .deploy .k8s .config ._
34
34
import org .apache .spark .deploy .k8s .integrationtest .backend .IntegrationTestBackendFactory
35
- import org .apache .spark .deploy .k8s .integrationtest .backend .minikube .Minikube
35
+ import org .apache .spark .deploy .k8s .integrationtest .backend .minikube .{ Minikube , MinikubeTestBackend }
36
36
import org .apache .spark .deploy .k8s .integrationtest .constants .MINIKUBE_TEST_BACKEND
37
37
import org .apache .spark .deploy .k8s .submit .{Client , ClientArguments , JavaMainAppResource , KeyAndCertPem , MainAppResource , PythonMainAppResource }
38
38
import org .apache .spark .launcher .SparkLauncher
@@ -51,9 +51,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
51
51
testBackend.initialize()
52
52
kubernetesTestComponents = new KubernetesTestComponents (testBackend.getKubernetesClient)
53
53
resourceStagingServerLauncher = new ResourceStagingServerLauncher (
54
- kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
54
+ kubernetesTestComponents
55
+ .kubernetesClient
56
+ .inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
55
57
staticAssetServerLauncher = new StaticAssetServerLauncher (
56
- kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
58
+ kubernetesTestComponents
59
+ .kubernetesClient
60
+ .inNamespace(kubernetesTestComponents.namespace), testBackend.dockerImageTag())
57
61
}
58
62
59
63
override def afterAll (): Unit = {
@@ -62,8 +66,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
62
66
63
67
before {
64
68
sparkConf = kubernetesTestComponents.newSparkConf()
65
- .set(INIT_CONTAINER_DOCKER_IMAGE , s " spark-init:latest " )
66
- .set(DRIVER_DOCKER_IMAGE , s " spark-driver:latest " )
69
+ .set(INIT_CONTAINER_DOCKER_IMAGE , tagImage(" spark-init" ))
70
+ .set(DRIVER_DOCKER_IMAGE , tagImage(" spark-driver" ))
71
+ .set(EXECUTOR_DOCKER_IMAGE , tagImage(" spark-executor" ))
67
72
.set(s " ${KUBERNETES_DRIVER_LABEL_PREFIX }spark-app-locator " , APP_LOCATOR_LABEL )
68
73
kubernetesTestComponents.createNamespace()
69
74
}
@@ -73,14 +78,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
73
78
}
74
79
75
80
test(" Run PySpark Job on file from SUBMITTER with --py-files" ) {
76
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
81
+ assume(testBackend == MinikubeTestBackend )
77
82
78
83
launchStagingServer(SSLOptions (), None )
79
84
sparkConf
80
- .set(DRIVER_DOCKER_IMAGE ,
81
- System .getProperty(" spark.docker.test.driverImage" , " spark-driver-py:latest" ))
82
- .set(EXECUTOR_DOCKER_IMAGE ,
83
- System .getProperty(" spark.docker.test.executorImage" , " spark-executor-py:latest" ))
85
+ .set(DRIVER_DOCKER_IMAGE , tagImage(" spark-driver-py" ))
86
+ .set(EXECUTOR_DOCKER_IMAGE , tagImage(" spark-executor-py" ))
87
+
84
88
85
89
runPySparkPiAndVerifyCompletion(
86
90
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION ,
@@ -89,27 +93,25 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
89
93
}
90
94
91
95
test(" Run PySpark Job on file from CONTAINER with spark.jar defined" ) {
92
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
96
+ assume(testBackend == MinikubeTestBackend )
93
97
94
98
sparkConf.setJars(Seq (CONTAINER_LOCAL_HELPER_JAR_PATH ))
95
99
sparkConf
96
- .set(DRIVER_DOCKER_IMAGE ,
97
- System .getProperty(" spark.docker.test.driverImage" , " spark-driver-py:latest" ))
98
- .set(EXECUTOR_DOCKER_IMAGE ,
99
- System .getProperty(" spark.docker.test.executorImage" , " spark-executor-py:latest" ))
100
+ .set(DRIVER_DOCKER_IMAGE , tagImage(" spark-driver-py" ))
101
+ .set(EXECUTOR_DOCKER_IMAGE , tagImage(" spark-executor-py" ))
100
102
101
103
runPySparkPiAndVerifyCompletion(PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION , Seq .empty[String ])
102
104
}
103
105
104
106
test(" Simple submission test with the resource staging server." ) {
105
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
107
+ assume(testBackend == MinikubeTestBackend )
106
108
107
109
launchStagingServer(SSLOptions (), None )
108
110
runSparkPiAndVerifyCompletion(SUBMITTER_LOCAL_MAIN_APP_RESOURCE )
109
111
}
110
112
111
113
test(" Enable SSL on the resource staging server" ) {
112
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
114
+ assume(testBackend == MinikubeTestBackend )
113
115
114
116
val keyStoreAndTrustStore = SSLUtils .generateKeyStoreTrustStorePair(
115
117
ipAddress = Minikube .getMinikubeIp,
@@ -136,14 +138,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
136
138
}
137
139
138
140
test(" Use container-local resources without the resource staging server" ) {
139
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
141
+ assume(testBackend == MinikubeTestBackend )
140
142
141
143
sparkConf.setJars(Seq (CONTAINER_LOCAL_HELPER_JAR_PATH ))
142
144
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE )
143
145
}
144
146
145
147
test(" Dynamic executor scaling basic test" ) {
146
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
148
+ assume(testBackend == MinikubeTestBackend )
147
149
148
150
launchStagingServer(SSLOptions (), None )
149
151
createShuffleServiceDaemonSet()
@@ -163,7 +165,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
163
165
}
164
166
165
167
test(" Use remote resources without the resource staging server." ) {
166
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
168
+ assume(testBackend == MinikubeTestBackend )
167
169
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
168
170
sparkConf.setJars(Seq (
169
171
s " $assetServerUri/ ${EXAMPLES_JAR_FILE .getName}" ,
@@ -173,7 +175,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
173
175
}
174
176
175
177
test(" Mix remote resources with submitted ones." ) {
176
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
178
+ assume(testBackend == MinikubeTestBackend )
177
179
launchStagingServer(SSLOptions (), None )
178
180
val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer()
179
181
sparkConf.setJars(Seq (
@@ -183,7 +185,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
183
185
}
184
186
185
187
test(" Use key and certificate PEM files for TLS." ) {
186
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
188
+ assume(testBackend == MinikubeTestBackend )
187
189
val keyAndCertificate = SSLUtils .generateKeyCertPemPair(Minikube .getMinikubeIp)
188
190
launchStagingServer(
189
191
SSLOptions (enabled = true ),
@@ -195,7 +197,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
195
197
}
196
198
197
199
test(" Use client key and client cert file when requesting executors" ) {
198
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
200
+ assume(testBackend == MinikubeTestBackend )
199
201
sparkConf.setJars(Seq (
200
202
CONTAINER_LOCAL_MAIN_APP_RESOURCE ,
201
203
CONTAINER_LOCAL_HELPER_JAR_PATH ))
@@ -212,7 +214,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
212
214
}
213
215
214
216
test(" Added files should be placed in the driver's working directory." ) {
215
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
217
+ assume(testBackend == MinikubeTestBackend )
216
218
launchStagingServer(SSLOptions (), None )
217
219
val testExistenceFileTempDir = Utils .createTempDir(namePrefix = " test-existence-file-temp-dir" )
218
220
val testExistenceFile = new File (testExistenceFileTempDir, " input.txt" )
@@ -230,7 +232,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
230
232
}
231
233
232
234
test(" Setting JVM options on the driver and executors with spaces." ) {
233
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
235
+ assume(testBackend == MinikubeTestBackend )
234
236
launchStagingServer(SSLOptions (), None )
235
237
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
236
238
Map (" simpleDriverConf" -> " simpleDriverConfValue" ,
@@ -260,7 +262,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
260
262
}
261
263
262
264
test(" Submit small local files without the resource staging server." ) {
263
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
265
+ assume(testBackend == MinikubeTestBackend )
264
266
sparkConf.setJars(Seq (CONTAINER_LOCAL_HELPER_JAR_PATH ))
265
267
val testExistenceFileTempDir = Utils .createTempDir(namePrefix = " test-existence-file-temp-dir" )
266
268
val testExistenceFile = new File (testExistenceFileTempDir, " input.txt" )
@@ -278,15 +280,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
278
280
}
279
281
280
282
test(" Use a very long application name." ) {
281
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
283
+ assume(testBackend == MinikubeTestBackend )
282
284
283
285
sparkConf.setJars(Seq (CONTAINER_LOCAL_HELPER_JAR_PATH )).setAppName(" long" * 40 )
284
286
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE )
285
287
}
286
288
287
289
private def launchStagingServer (
288
290
resourceStagingServerSslOptions : SSLOptions , keyAndCertPem : Option [KeyAndCertPem ]): Unit = {
289
- assume(testBackend.name == MINIKUBE_TEST_BACKEND )
291
+ assume(testBackend == MinikubeTestBackend )
290
292
291
293
val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
292
294
resourceStagingServerSslOptions, keyAndCertPem)
@@ -368,7 +370,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
368
370
.endVolume()
369
371
.addNewContainer()
370
372
.withName(" shuffle" )
371
- .withImage(" spark-shuffle:latest " )
373
+ .withImage(s " spark-shuffle: ${testBackend.dockerImageTag()} " )
372
374
.withImagePullPolicy(" IfNotPresent" )
373
375
.addNewVolumeMount()
374
376
.withName(" shuffle-dir" )
@@ -404,6 +406,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
404
406
}
405
407
propertiesFile
406
408
}
409
+
410
+ private def tagImage (image : String ): String = s " $image: ${testBackend.dockerImageTag()}"
407
411
}
408
412
409
413
private [spark] object KubernetesSuite {
0 commit comments