Skip to content

Commit 3df45b2

Browse files
committed
Added a test case using PageRank with a remote data file
1 parent 83544dd commit 3df45b2

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
161161
})
162162
}
163163

164+
test("Run PageRank using remote data file") {
165+
sparkAppConf
166+
.set("spark.kubernetes.mountDependencies.filesDownloadDir", FILE_DOWNLOAD_PATH)
167+
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
168+
.set("spark.kubernetes.initContainer.image", initContainerImage)
169+
runSparkPageRankAndVerifyCompletion(appArgs = Array(LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
170+
}
171+
164172
private def runSparkPiAndVerifyCompletion(
165173
appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR,
166174
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -175,6 +183,20 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
175183
executorPodChecker)
176184
}
177185

186+
private def runSparkPageRankAndVerifyCompletion(
187+
appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR,
188+
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
189+
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
190+
appArgs: Array[String]): Unit = {
191+
runSparkApplicationAndVerifyCompletion(
192+
appResource,
193+
SPARK_PAGE_RANK_MAIN_CLASS,
194+
Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
195+
appArgs,
196+
driverPodChecker,
197+
executorPodChecker)
198+
}
199+
178200
private def runSparkApplicationAndVerifyCompletion(
179201
appResource: String,
180202
mainClass: String,
@@ -291,14 +313,21 @@ private[spark] object KubernetesSuite {
291313
val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/jars/" +
292314
s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}"
293315
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
316+
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
294317

295318
val TEST_SECRET_NAME = "test-secret"
296319
val TEST_SECRET_KEY = "test-key"
297320
val TEST_SECRET_VALUE = "test-data"
298321
val TEST_SECRET_MOUNT_PATH = "/etc/secrets"
299322

323+
val FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
324+
300325
val REMOTE_EXAMPLES_JAR_URI =
301326
"https://storage.googleapis.com/spark-k8s-integration-tests/jars/spark-examples_2.11-2.3.0.jar"
302327

328+
val REMOTE_PAGE_RANK_DATA_FILE =
329+
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
330+
val LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE = s"$FILE_DOWNLOAD_PATH/pagerank_data.txt"
331+
303332
case object ShuffleNotReadyException extends Exception
304333
}

0 commit comments

Comments
 (0)