Skip to content

Commit 08f64b4

Browse files
liyinan926Robert Kruszewski
authored andcommitted
[SPARK-23285][K8S] Add a config property for specifying physical executor cores
## What changes were proposed in this pull request? As mentioned in SPARK-23285, this PR introduces a new configuration property `spark.kubernetes.executor.cores` for specifying the physical CPU cores requested for each executor pod. This is to avoid changing the semantics of `spark.executor.cores` and `spark.task.cpus` and their role in task scheduling, task parallelism, dynamic resource allocation, etc. The new configuration property only determines the physical CPU cores available to an executor. An executor can still run multiple tasks simultaneously by using appropriate values for `spark.executor.cores` and `spark.task.cpus`. ## How was this patch tested? Unit tests. felixcheung srowen jiangxb1987 jerryshao mccheah foxish Author: Yinan Li <[email protected]> Author: Yinan Li <[email protected]> Closes apache#20553 from liyinan926/master.
1 parent 60e1bd6 commit 08f64b4

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

docs/running-on-kubernetes.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,14 +549,23 @@ specific to Spark on Kubernetes.
549549
<td><code>spark.kubernetes.driver.limit.cores</code></td>
550550
<td>(none)</td>
551551
<td>
552-
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
552+
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
553553
</td>
554554
</tr>
555+
<tr>
556+
<td><code>spark.kubernetes.executor.request.cores</code></td>
557+
<td>(none)</td>
558+
<td>
559+
Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu).
560+
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units).
561+
This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task
562+
parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
563+
</tr>
555564
<tr>
556565
<td><code>spark.kubernetes.executor.limit.cores</code></td>
557566
<td>(none)</td>
558567
<td>
559-
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
568+
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
560569
</td>
561570
</tr>
562571
<tr>
@@ -593,4 +602,4 @@ specific to Spark on Kubernetes.
593602
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
594603
</td>
595604
</tr>
596-
</table>
605+
</table>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ private[spark] object Config extends Logging {
9191
.stringConf
9292
.createOptional
9393

94+
val KUBERNETES_EXECUTOR_REQUEST_CORES =
95+
ConfigBuilder("spark.kubernetes.executor.request.cores")
96+
.doc("Specify the cpu request for each executor pod")
97+
.stringConf
98+
.createOptional
99+
94100
val KUBERNETES_DRIVER_POD_NAME =
95101
ConfigBuilder("spark.kubernetes.driver.pod.name")
96102
.doc("Name of the driver pod.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ private[spark] class ExecutorPodFactory(
8585
MEMORY_OVERHEAD_MIN_MIB))
8686
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
8787

88-
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
88+
private val executorCores = sparkConf.getInt("spark.executor.cores", 1)
89+
private val executorCoresRequest = if (sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
90+
sparkConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
91+
} else {
92+
executorCores.toString
93+
}
8994
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
9095

9196
/**
@@ -113,7 +118,7 @@ private[spark] class ExecutorPodFactory(
113118
.withAmount(s"${executorMemoryWithOverhead}Mi")
114119
.build()
115120
val executorCpuQuantity = new QuantityBuilder(false)
116-
.withAmount(executorCores.toString)
121+
.withAmount(executorCoresRequest)
117122
.build()
118123
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
119124
new EnvVarBuilder()
@@ -132,8 +137,7 @@ private[spark] class ExecutorPodFactory(
132137
}.getOrElse(Seq.empty[EnvVar])
133138
val executorEnv = (Seq(
134139
(ENV_DRIVER_URL, driverUrl),
135-
// Executor backend expects integral value for executor cores, so round it up to an int.
136-
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
140+
(ENV_EXECUTOR_CORES, executorCores.toString),
137141
(ENV_EXECUTOR_MEMORY, executorMemoryString),
138142
(ENV_APPLICATION_ID, applicationId),
139143
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,33 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
8686
checkOwnerReferences(executor, driverPodUid)
8787
}
8888

89+
test("executor core request specification") {
90+
var factory = new ExecutorPodFactory(baseConf, None)
91+
var executor = factory.createExecutorPod(
92+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
93+
assert(executor.getSpec.getContainers.size() === 1)
94+
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
95+
=== "1")
96+
97+
val conf = baseConf.clone()
98+
99+
conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "0.1")
100+
factory = new ExecutorPodFactory(conf, None)
101+
executor = factory.createExecutorPod(
102+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
103+
assert(executor.getSpec.getContainers.size() === 1)
104+
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
105+
=== "0.1")
106+
107+
conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
108+
factory = new ExecutorPodFactory(conf, None)
109+
conf.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "100m")
110+
executor = factory.createExecutorPod(
111+
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
112+
assert(executor.getSpec.getContainers.get(0).getResources.getRequests.get("cpu").getAmount
113+
=== "100m")
114+
}
115+
89116
test("executor pod hostnames get truncated to 63 characters") {
90117
val conf = baseConf.clone()
91118
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,

0 commit comments

Comments
 (0)