Skip to content

Commit 8ea3cf0

Browse files
mccheahrobert3005
authored andcommitted
Add back executor id to retrieving Spark application configuration. (apache-spark-on-k8s#224)
1 parent 6fb0b99 commit 8ea3cf0

File tree

4 files changed

+6
-7
lines changed

4 files changed

+6
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
199199
new SecurityManager(executorConf),
200200
clientMode = true)
201201
val driver = fetcher.setupEndpointRefByURI(driverUrl)
202-
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
202+
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId))
203203
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
204204
fetcher.shutdown()
205205

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2828

2929
private[spark] object CoarseGrainedClusterMessages {
3030

31-
case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
31+
case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage
3232

3333
case class SparkAppConfig(
3434
sparkProperties: Seq[(String, String)],

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
223223
removeWorker(workerId, host, message)
224224
context.reply(true)
225225

226-
case RetrieveSparkAppConfig =>
226+
case RetrieveSparkAppConfig(_) =>
227227
val reply = SparkAppConfig(sparkProperties,
228228
SparkEnv.get.securityManager.getIOEncryptionKey())
229229
context.reply(reply)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,21 +554,20 @@ private[spark] class KubernetesClusterSchedulerBackend(
554554
new PartialFunction[Any, Unit]() {
555555
override def isDefinedAt(msg: Any): Boolean = {
556556
msg match {
557-
case RetrieveSparkAppConfig =>
557+
case RetrieveSparkAppConfig(_) =>
558558
Utils.isDynamicAllocationEnabled(sc.conf)
559559
case _ => false
560560
}
561561
}
562562

563563
override def apply(msg: Any): Unit = {
564564
msg match {
565-
case RetrieveSparkAppConfig =>
565+
case RetrieveSparkAppConfig(executorId) =>
566566
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
567567
var resolvedProperties = sparkProperties
568568
val runningExecutorPod = kubernetesClient
569569
.pods()
570-
.withName(runningExecutorPods(currentExecutorIdCounter.toString)
571-
.getMetadata.getName)
570+
.withName(runningExecutorPods(executorId).getMetadata.getName)
572571
.get()
573572
val nodeName = runningExecutorPod.getSpec.getNodeName
574573
val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName)

0 commit comments

Comments
 (0)