This repository was archived by the owner on Jan 9, 2020. It is now read-only.
File tree Expand file tree Collapse file tree 4 files changed +58
-2
lines changed
resource-managers/kubernetes/core/src/main/scala/org/apache/spark
scheduler/cluster/kubernetes Expand file tree Collapse file tree 4 files changed +58
-2
lines changed Original file line number Diff line number Diff line change @@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html
718
718
Docker image pull policy used when pulling Docker images with Kubernetes.
719
719
</td >
720
720
</tr >
721
+ <tr >
722
+ <td ><code >spark.kubernetes.driver.limit.cores</code ></td >
723
+ <td >(none)</td >
724
+ <td >
725
+ Specify the hard cpu limit for the driver pod
726
+ </td >
727
+ </tr >
728
+ <tr >
729
+ <td ><code >spark.kubernetes.executor.limit.cores</code ></td >
730
+ <td >(none)</td >
731
+ <td >
732
+ Specify the hard cpu limit for a single executor pod
733
+ </td >
734
+ </tr >
721
735
</table >
722
736
723
737
Original file line number Diff line number Diff line change @@ -485,6 +485,18 @@ package object config extends Logging {
485
485
.stringConf
486
486
.createOptional
487
487
488
+ private [spark] val KUBERNETES_DRIVER_LIMIT_CORES =
489
+ ConfigBuilder (" spark.kubernetes.driver.limit.cores" )
490
+ .doc(" Specify the hard cpu limit for the driver pod" )
491
+ .stringConf
492
+ .createOptional
493
+
494
+ private [spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
495
+ ConfigBuilder (" spark.kubernetes.executor.limit.cores" )
496
+ .doc(" Specify the hard cpu limit for a single executor pod" )
497
+ .stringConf
498
+ .createOptional
499
+
488
500
private [spark] def resolveK8sMaster (rawMasterString : String ): String = {
489
501
if (! rawMasterString.startsWith(" k8s://" )) {
490
502
throw new IllegalArgumentException (" Master URL should start with k8s:// in Kubernetes mode." )
Original file line number Diff line number Diff line change @@ -64,6 +64,7 @@ private[spark] class Client(
64
64
65
65
// CPU settings
66
66
private val driverCpuCores = sparkConf.getOption(" spark.driver.cores" ).getOrElse(" 1" )
67
+ private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES .key)
67
68
68
69
// Memory settings
69
70
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY )
@@ -139,7 +140,6 @@ private[spark] class Client(
139
140
.endEnv()
140
141
.withNewResources()
141
142
.addToRequests(" cpu" , driverCpuQuantity)
142
- .addToLimits(" cpu" , driverCpuQuantity)
143
143
.addToRequests(" memory" , driverMemoryQuantity)
144
144
.addToLimits(" memory" , driverMemoryLimitQuantity)
145
145
.endResources()
@@ -156,6 +156,21 @@ private[spark] class Client(
156
156
.addToContainers(driverContainer)
157
157
.endSpec()
158
158
159
+ driverLimitCores.map {
160
+ limitCores =>
161
+ val driverCpuLimitQuantity = new QuantityBuilder (false )
162
+ .withAmount(limitCores)
163
+ .build()
164
+ basePod
165
+ .editSpec()
166
+ .editFirstContainer()
167
+ .editResources
168
+ .addToLimits(" cpu" , driverCpuLimitQuantity)
169
+ .endResources()
170
+ .endContainer()
171
+ .endSpec()
172
+ }
173
+
159
174
val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
160
175
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
161
176
.map { uploader =>
Original file line number Diff line number Diff line change @@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
108
108
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb
109
109
110
110
private val executorCores = conf.getOption(" spark.executor.cores" ).getOrElse(" 1" )
111
+ private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES .key)
111
112
112
113
private implicit val requestExecutorContext = ExecutionContext .fromExecutorService(
113
114
ThreadUtils .newDaemonCachedThreadPool(" kubernetes-executor-requests" ))
@@ -438,14 +439,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
438
439
.addToRequests(" memory" , executorMemoryQuantity)
439
440
.addToLimits(" memory" , executorMemoryLimitQuantity)
440
441
.addToRequests(" cpu" , executorCpuQuantity)
441
- .addToLimits(" cpu" , executorCpuQuantity)
442
442
.endResources()
443
443
.addAllToEnv(requiredEnv.asJava)
444
444
.addToEnv(executorExtraClasspathEnv.toSeq: _* )
445
445
.withPorts(requiredPorts.asJava)
446
446
.endContainer()
447
447
.endSpec()
448
448
449
+ executorLimitCores.map {
450
+ limitCores =>
451
+ val executorCpuLimitQuantity = new QuantityBuilder (false )
452
+ .withAmount(limitCores)
453
+ .build()
454
+ basePodBuilder
455
+ .editSpec()
456
+ .editFirstContainer()
457
+ .editResources
458
+ .addToLimits(" cpu" , executorCpuLimitQuantity)
459
+ .endResources()
460
+ .endContainer()
461
+ .endSpec()
462
+ }
463
+
449
464
val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
450
465
.map { config =>
451
466
config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>
You can’t perform that action at this time.
0 commit comments