From 41e9d3a07932ab73caf2cee8da301bd51e361876 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 5 Sep 2017 22:08:09 -0700 Subject: [PATCH] Additional config settings for the memory overhead factor I'm seeing the default value of 0.10 fail for even reasonably-sized shuffle jobs so expect this value to require some tuning to reliably succeed. We copied this default value from YARN but it appears that kubernetes is more strict on enforcing memory limits on containers than YARN has been: I have two identically configured clusters of five AWS r3.4xls, one running YARN and the other running kubernetes, with identical driver/executor settings, running identical jobs, and the YARN job succeeds whereas the k8s job fails due to the pod exceeding its memory limit. --- .../spark/deploy/kubernetes/config.scala | 18 ++++++++++++++++++ .../spark/deploy/kubernetes/constants.scala | 1 - .../BaseDriverConfigurationStep.scala | 7 ++++--- .../KubernetesClusterSchedulerBackend.scala | 3 ++- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 53a184cba7a4d..58d6210e8b30d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -93,6 +93,15 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.kubernetes.executor.memoryOverheadFactor") + .doc("The additional percentage of the executor heap to request from Kubernetes for pods" + + " memory limits, e.g. 0.10 for 10% overhead. This is overridden by" + + " spark.kubernetes.executor.memoryOverhead") + .doubleConf + .checkValue(_ > 0, "Overhead factors must be positive") + .createWithDefault(0.1) + // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. @@ -113,6 +122,15 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.kubernetes.driver.memoryOverheadFactor") + .doc("The additional percentage of the driver heap to request from Kubernetes for pod" + + " memory limits, e.g. 0.10 for 10% overhead. This is overridden by" + + " spark.kubernetes.driver.memoryOverhead") + .doubleConf + .checkValue(_ > 0, "Overhead factors must be positive") + .createWithDefault(0.1) + private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 2c2ccf31b9dd9..3e882ac39b1f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -100,6 +100,5 @@ package object constants { private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity" private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" - private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala index b76c77a656d2d..748d9f36fdd0d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -52,9 +52,10 @@ private[spark] class BaseDriverConfigurationStep( org.apache.spark.internal.config.DRIVER_MEMORY.key, org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) private val memoryOverheadMiB = submissionSparkConf - .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) + .getOrElse(math.max( + (submissionSparkConf.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB private val driverDockerImage = submissionSparkConf.get(DRIVER_DOCKER_IMAGE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a6f2fdcd3c710..870ab7e2d9eaa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -120,7 +120,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val memoryOverheadMiB = conf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + .getOrElse(math.max( + (conf.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverheadMiB = executorMemoryMiB + memoryOverheadMiB