1616 */
1717package org .apache .spark .scheduler .cluster .armada
1818
19- import java .util .concurrent .{ ScheduledExecutorService }
19+ import java .util .concurrent .ScheduledExecutorService
2020
2121import scala .collection .mutable .HashMap
2222
23+ import io .armadaproject .armada .ArmadaClient
24+ import k8s .io .api .core .v1 .generated .{Container , PodSpec , ResourceRequirements }
25+ import k8s .io .api .core .v1 .generated .{EnvVar , EnvVarSource , ObjectFieldSelector }
26+ import k8s .io .apimachinery .pkg .api .resource .generated .Quantity
27+
2328import org .apache .spark .SparkContext
2429import org .apache .spark .rpc .{RpcAddress , RpcCallContext }
2530import org .apache .spark .scheduler .{ExecutorDecommission , TaskSchedulerImpl }
2631import org .apache .spark .scheduler .cluster .{CoarseGrainedSchedulerBackend , SchedulerBackendUtils }
2732
28- // FIXME: Actually import ArmadaClient
29- class ArmadaClient {}
3033
3134// TODO: Implement for Armada
3235private [spark] class ArmadaClusterSchedulerBackend (
3336 scheduler : TaskSchedulerImpl ,
3437 sc : SparkContext ,
35- armadaClient : ArmadaClient ,
36- executorService : ScheduledExecutorService )
38+ executorService : ScheduledExecutorService ,
39+ masterURL : String )
3740 extends CoarseGrainedSchedulerBackend (scheduler, sc.env.rpcEnv) {
3841
3942 // FIXME
@@ -45,7 +48,78 @@ private[spark] class ArmadaClusterSchedulerBackend(
4548 conf.getOption(" spark.app.id" ).getOrElse(appId)
4649 }
4750
48- override def start (): Unit = {}
51+
52+ private def submitJob (): Unit = {
53+
54+ val urlArray = masterURL.split(" :" )
55+ // Remove leading "/"'s
56+ val host = if (urlArray(1 ).startsWith(" /" )) urlArray(1 ).substring(2 ) else urlArray(1 )
57+ val port = urlArray(2 ).toInt
58+
59+ val driverAddr = sys.env(" SPARK_DRIVER_BIND_ADDRESS" )
60+
61+
62+ val driverURL = s " spark://CoarseGrainedScheduler@ $driverAddr:7078 "
63+ val source = EnvVarSource ().withFieldRef(ObjectFieldSelector ()
64+ .withApiVersion(" v1" ).withFieldPath(" status.podIP" ))
65+ val envVars = Seq (
66+ EnvVar ().withName(" SPARK_EXECUTOR_ID" ).withValue(" 1" ),
67+ EnvVar ().withName(" SPARK_RESOURCE_PROFILE_ID" ).withValue(" 0" ),
68+ EnvVar ().withName(" SPARK_EXECUTOR_POD_NAME" ).withValue(" test-pod-name" ),
69+ EnvVar ().withName(" SPARK_APPLICATION_ID" ).withValue(" test_spark_app_id" ),
70+ EnvVar ().withName(" SPARK_EXECUTOR_CORES" ).withValue(" 1" ),
71+ EnvVar ().withName(" SPARK_EXECUTOR_MEMORY" ).withValue(" 512m" ),
72+ EnvVar ().withName(" SPARK_DRIVER_URL" ).withValue(driverURL),
73+ EnvVar ().withName(" SPARK_EXECUTOR_POD_IP" ).withValueFrom(source)
74+ )
75+ val executorContainer = Container ()
76+ .withName(" spark-executor" )
77+ .withImagePullPolicy(" IfNotPresent" )
78+ .withImage(" spark:testing" )
79+ .withEnv(envVars)
80+ .withCommand(Seq (" /opt/entrypoint.sh" ))
81+ .withArgs(
82+ Seq (
83+ " executor"
84+ )
85+ )
86+ .withResources(
87+ ResourceRequirements (
88+ limits = Map (
89+ " memory" -> Quantity (Option (" 1000Mi" )),
90+ " cpu" -> Quantity (Option (" 100m" ))
91+ ),
92+ requests = Map (
93+ " memory" -> Quantity (Option (" 1000Mi" )),
94+ " cpu" -> Quantity (Option (" 100m" ))
95+ )
96+ )
97+ )
98+
99+ val podSpec = PodSpec ()
100+ .withTerminationGracePeriodSeconds(0 )
101+ .withRestartPolicy(" Never" )
102+ .withContainers(Seq (executorContainer))
103+
104+ val testJob = api.submit
105+ .JobSubmitRequestItem ()
106+ .withPriority(0 )
107+ .withNamespace(" default" )
108+ .withPodSpec(podSpec)
109+
110+ val jobSubmitResponse = ArmadaClient (host, port)
111+ .SubmitJobs (" test" , " executor" , Seq (testJob))
112+
113+ logInfo(s " Driver Job Submit Response " )
114+ for (respItem <- jobSubmitResponse.jobResponseItems) {
115+ logInfo(s " JobID: ${respItem.jobId} Error: ${respItem.error} " )
116+
117+ }
118+ }
119+ override def start (): Unit = {
120+ submitJob()
121+ }
122+
49123 override def stop (): Unit = {}
50124
51125 /*
@@ -100,7 +174,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
100174 execIDRequester -= rpcAddress
101175 // Expected, executors re-establish a connection with an ID
102176 case _ =>
103- logDebug(s " No executor found for ${ rpcAddress} " )
177+ logDebug(s " No executor found for $rpcAddress" )
104178 }
105179 }
106180 }
0 commit comments