From 7f06745fe73ed6a292d73e05e89d6c6b23700459 Mon Sep 17 00:00:00 2001 From: duyanghao <1294057873@qq.com> Date: Fri, 1 Sep 2017 14:41:07 +0800 Subject: [PATCH 1/2] Add Support for some kubernetes client settings Add Support for `spark.kubernetes.client.watch.reconnectInterval`(which is relevant to KUBERNETES_WATCH_RECONNECT_INTERVAL_SYSTEM_PROPERTY in io.fabric8.kubernetes.client.Config.) Add Support for `spark.kubernetes.client.watch.reconnectLimit`(which is relevant to KUBERNETES_WATCH_RECONNECT_LIMIT_SYSTEM_PROPERTY in io.fabric8.kubernetes.client.Config.) Add Support for `spark.kubernetes.client.connection.timeout`(which is relevant to KUBERNETES_CONNECTION_TIMEOUT_SYSTEM_PROPERTY in io.fabric8.kubernetes.client.Config.) Add Support for `spark.kubernetes.client.request.timeout`(which is relevant to KUBERNETES_REQUEST_TIMEOUT_SYSTEM_PROPERTY in io.fabric8.kubernetes.client.Config.) Signed-off-by: duyanghao <1294057873@qq.com> --- docs/running-on-kubernetes.md | 28 +++++++++++++++++++ .../SparkKubernetesClientFactory.scala | 12 ++++++++ .../spark/deploy/kubernetes/config.scala | 24 ++++++++++++++++ 3 files changed, 64 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b44857fffcdfa..7bb41e530af4c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -798,6 +798,34 @@ from the other deployment modes. See the [configuration page](configuration.html the Driver process. The user can specify multiple of these to set multiple environment variables. + + spark.kubernetes.client.watch.reconnectIntervalInMs + 1s + + Connection retry interval for kubernetes client requests. + + + + spark.kubernetes.client.watch.reconnectLimit + -1 + + Limit of times connections can be attempted for kubernetes client requests. + + + + spark.kubernetes.client.connection.timeoutInMs + 10s + + Connection timeout for kubernetes client requests. + + + + spark.kubernetes.client.request.timeoutInMs + 10s + + Request timeout for kubernetes client requests. + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala index d2729a2db2fa0..5ecbc2f4a0e9e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkKubernetesClientFactory.scala @@ -81,6 +81,18 @@ private[spark] object SparkKubernetesClientFactory { }.withOption(namespace) { (ns, configBuilder) => configBuilder.withNamespace(ns) }.build() + val connTimeoutPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_CONNECTION_TIMEOUT_SYSTEM_PROPERTY) + config.setConnectionTimeout(connTimeoutPropertiesMs.toInt) + val reqTimeoutPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_REQUEST_TIMEOUT_SYSTEM_PROPERTY) + config.setRequestTimeout(reqTimeoutPropertiesMs.toInt) + val reconnectIntervalPropertiesMs = sparkConf.get( + KUBERNETES_CLIENT_WATCH_RECONNECT_INTERVAL_SYSTEM_PROPERTY) + config.setWatchReconnectInterval(reconnectIntervalPropertiesMs.toInt) + val reconnectLimitProperties = sparkConf.get( + KUBERNETES_CLIENT_WATCH_RECONNECT_LIMIT_SYSTEM_PROPERTY) + config.setWatchReconnectLimit(reconnectLimitProperties) val baseHttpClient = HttpClientUtils.createHttpClient(config) val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() .dispatcher(dispatcher) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 53a184cba7a4d..5957b19a2ea7b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -545,4 +545,28 @@ package object config extends Logging { resolvedURL } } + + private[spark] val KUBERNETES_CLIENT_WATCH_RECONNECT_INTERVAL_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.watch.reconnectIntervalInMs") + .doc("Connection retry interval for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val KUBERNETES_CLIENT_WATCH_RECONNECT_LIMIT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.watch.reconnectLimit") + .doc("Limit of times connections can be attempted for kubernetes client requests.") + .intConf + .createWithDefault(-1) + + private[spark] val KUBERNETES_CLIENT_CONNECTION_TIMEOUT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.connection.timeoutInMs") + .doc("Connection timeout for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val KUBERNETES_CLIENT_REQUEST_TIMEOUT_SYSTEM_PROPERTY = + ConfigBuilder("spark.kubernetes.client.request.timeoutInMs") + .doc("Request timeout for kubernetes client requests.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") } From 81c458688b00c5d0147664bfc74a14a55de3fd7f Mon Sep 17 00:00:00 2001 From: duyanghao <1294057873@qq.com> Date: Wed, 14 Mar 2018 14:53:41 +0800 Subject: [PATCH 2/2] Add Recovery Logic for Failed Pod Signed-off-by: duyanghao <1294057873@qq.com> --- .../KubernetesClusterSchedulerBackend.scala | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index d30c88fcc74bf..e36a8763de7b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -339,6 +339,21 @@ private[spark] class KubernetesClusterSchedulerBackend( val clusterNodeName = pod.getSpec.getNodeName logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) + } else if (action == Action.MODIFIED && pod.getStatus.getPhase == "Failed" + && pod.getMetadata.getDeletionTimestamp == null) { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + if (podIP != null) { + executorPodsByIPs.remove(podIP) + } + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningPodsToExecutors.get(podName).foreach { executorId => + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + logInfo(s"executor $executorId Failed") + } + } + logInfo(s"Received pod $podName failed event. Reason: " + pod.getStatus.getReason) + handleErroredPod(pod) } else if ((action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) || action == Action.DELETED || action == Action.ERROR) { val podName = pod.getMetadata.getName @@ -393,7 +408,7 @@ private[spark] class KubernetesClusterSchedulerBackend( " exited from explicit termination request.") } else { val containerExitReason = containerExitStatus match { - case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => + case MEM_EXCEEDED_EXIT_CODE => memLimitExceededLogMessage(pod.getStatus.getReason) case _ => // Here we can't be sure that that exit was caused by the application but this seems @@ -474,8 +489,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private object KubernetesClusterSchedulerBackend { - private val VMEM_EXCEEDED_EXIT_CODE = -103 - private val PMEM_EXCEEDED_EXIT_CODE = -104 + private val MEM_EXCEEDED_EXIT_CODE = 137 private val UNKNOWN_EXIT_CODE = -111 // Number of times we are allowed check for the loss reason for an executor before we give up // and assume the executor failed for good, and attribute it to a framework fault.