@@ -161,6 +161,14 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
161
161
})
162
162
}
163
163
164
+ test(" Run PageRank using remote data file" ) {
165
+ sparkAppConf
166
+ .set(" spark.kubernetes.mountDependencies.filesDownloadDir" , CONTAINER_LOCAL_FILE_DOWNLOAD_PATH )
167
+ .set(" spark.files" , REMOTE_PAGE_RANK_DATA_FILE )
168
+ .set(" spark.kubernetes.initContainer.image" , initContainerImage)
169
+ runSparkPageRankAndVerifyCompletion(appArgs = Array (CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE ))
170
+ }
171
+
164
172
private def runSparkPiAndVerifyCompletion (
165
173
appResource : String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR ,
166
174
driverPodChecker : Pod => Unit = doBasicDriverPodCheck,
@@ -175,6 +183,20 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
175
183
executorPodChecker)
176
184
}
177
185
186
+ private def runSparkPageRankAndVerifyCompletion (
187
+ appResource : String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR ,
188
+ driverPodChecker : Pod => Unit = doBasicDriverPodCheck,
189
+ executorPodChecker : Pod => Unit = doBasicExecutorPodCheck,
190
+ appArgs : Array [String ]): Unit = {
191
+ runSparkApplicationAndVerifyCompletion(
192
+ appResource,
193
+ SPARK_PAGE_RANK_MAIN_CLASS ,
194
+ Seq (" 1 has rank" , " 2 has rank" , " 3 has rank" , " 4 has rank" ),
195
+ appArgs,
196
+ driverPodChecker,
197
+ executorPodChecker)
198
+ }
199
+
178
200
private def runSparkApplicationAndVerifyCompletion (
179
201
appResource : String ,
180
202
mainClass : String ,
@@ -291,14 +313,22 @@ private[spark] object KubernetesSuite {
291
313
val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR : String = s " local:///opt/spark/examples/jars/ " +
292
314
s " ${SPARK_DISTRO_EXAMPLES_JAR_FILE .getName}"
293
315
val SPARK_PI_MAIN_CLASS : String = " org.apache.spark.examples.SparkPi"
316
+ val SPARK_PAGE_RANK_MAIN_CLASS : String = " org.apache.spark.examples.SparkPageRank"
294
317
295
318
val TEST_SECRET_NAME = " test-secret"
296
319
val TEST_SECRET_KEY = " test-key"
297
320
val TEST_SECRET_VALUE = " test-data"
298
321
val TEST_SECRET_MOUNT_PATH = " /etc/secrets"
299
322
323
+ val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = " /var/spark-data/spark-files"
324
+
300
325
val REMOTE_EXAMPLES_JAR_URI =
301
326
" https://storage.googleapis.com/spark-k8s-integration-tests/jars/spark-examples_2.11-2.3.0.jar"
302
327
328
+ val REMOTE_PAGE_RANK_DATA_FILE =
329
+ " https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
330
+ val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
331
+ s " $CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt "
332
+
303
333
case object ShuffleNotReadyException extends Exception
304
334
}
0 commit comments