Skip to content

Commit dd8d13b

Browse files
committed
[SPARK-53944][K8S] Support spark.kubernetes.executor.useDriverPodIP
### What changes were proposed in this pull request? This PR aims to support `Spark Executor` pod to use `Spark Driver` pod IP instead of `Spark Driver`'s K8s Service in order to bypass K8s DNS issues. ### Why are the changes needed? K8s DNS has a known issue and an official workaround via `initContainer` solution because it assumes IP can be changed during restarting the pods. - https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#init-containers-in-use This (SPARK-53944) provides an additional option for users to choose IP over DNS in K8s environment instead of `initContainer` workaround because Apache Spark Driver is not supposed to be restarted in general. When a Spark Driver pod terminates for some reason, it's more natural to terminate all its executor pods and restart the whole Spark job from the beginning. Since `Driver` Pod IP is automatically injected via `SPARK_DRIVER_BIND_ADDRESS`, this PR re-use it. https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala#L131-L133 https://github.com/apache/spark/blob/8499a62fb6b1ee51f82e65c4e449ec2eae6a0cc2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L82-L86 ### Does this PR introduce _any_ user-facing change? No, this is disabled by default. ### How was this patch tested? Pass the CI with newly added test case. I also verified manually. **spark.kubernetes.executor.useDriverPodIP=false** ``` ... --driver-url spark://CoarseGrainedSchedulerpi-0-driver-svc.default.svc:7078 ... ``` **spark.kubernetes.executor.useDriverPodIP=true** ``` ... --driver-url spark://CoarseGrainedScheduler10.1.20.97:7078 ... ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52650 from dongjoon-hyun/SPARK-53944. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 2c7bc89 commit dd8d13b

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ private[spark] object Config extends Logging {
120120
.booleanConf
121121
.createWithDefault(false)
122122

123+
val KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP =
124+
ConfigBuilder("spark.kubernetes.executor.useDriverPodIP")
125+
.doc("If true, executor pods use Driver pod IP directly instead of Driver Service.")
126+
.version("4.1.0")
127+
.booleanConf
128+
.createWithDefault(false)
129+
123130
val KUBERNETES_NAMESPACE =
124131
ConfigBuilder("spark.kubernetes.namespace")
125132
.doc("The namespace that will be used for running the driver and executor pods.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,13 @@ private[spark] class BasicExecutorFeatureStep(
4949

5050
private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
5151

52+
private val driverAddress = if (kubernetesConf.get(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP)) {
53+
kubernetesConf.get(DRIVER_BIND_ADDRESS)
54+
} else {
55+
kubernetesConf.get(DRIVER_HOST_ADDRESS)
56+
}
5257
private val driverUrl = RpcEndpointAddress(
53-
kubernetesConf.get(DRIVER_HOST_ADDRESS),
58+
driverAddress,
5459
kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
5560
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
5661

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,20 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
305305
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> KubernetesTestConf.EXECUTOR_ID))
306306
}
307307

308+
test("SPARK-53944: Support spark.kubernetes.executor.useDriverPodIP") {
309+
Seq((false, "localhost"), (true, "bindAddress")).foreach {
310+
case (flag, address) =>
311+
val conf = baseConf.clone()
312+
.set(DRIVER_BIND_ADDRESS, "bindAddress")
313+
.set(KUBERNETES_EXECUTOR_USE_DRIVER_POD_IP, flag)
314+
val kconf = KubernetesTestConf.createExecutorConf(sparkConf = conf)
315+
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(conf), defaultProfile)
316+
val executor = step.configurePod(SparkPod.initialPod())
317+
checkEnv(executor, conf, Map(
318+
ENV_DRIVER_URL -> s"spark://CoarseGrainedScheduler@$address:7098"))
319+
}
320+
}
321+
308322
test("test executor pyspark memory") {
309323
baseConf.set("spark.kubernetes.resource.type", "python")
310324
baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)

0 commit comments

Comments
 (0)