@@ -18,18 +18,20 @@ package org.apache.spark.scheduler.cluster.kubernetes
18
18
19
19
import java .io .Closeable
20
20
import java .net .InetAddress
21
- import java .util .concurrent .TimeUnit
21
+ import java .util .Collections
22
+ import java .util .concurrent .{ConcurrentHashMap , TimeUnit }
22
23
import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
23
24
25
+ import scala .collection .{concurrent , mutable }
26
+ import scala .collection .JavaConverters ._
27
+ import scala .concurrent .{ExecutionContext , Future }
28
+
24
29
import com .fasterxml .jackson .databind .ObjectMapper
25
30
import com .fasterxml .jackson .module .scala .DefaultScalaModule
26
- import io .fabric8 .kubernetes .api .model .{ ContainerBuilder , ContainerPortBuilder , EnvVarBuilder , EnvVarSourceBuilder , Pod , PodBuilder , QuantityBuilder }
31
+ import io .fabric8 .kubernetes .api .model ._
27
32
import io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
28
33
import io .fabric8 .kubernetes .client .Watcher .Action
29
34
import org .apache .commons .io .FilenameUtils
30
- import scala .collection .JavaConverters ._
31
- import scala .collection .mutable
32
- import scala .concurrent .{ExecutionContext , Future }
33
35
34
36
import org .apache .spark .{SparkContext , SparkEnv , SparkException }
35
37
import org .apache .spark .deploy .kubernetes .{ConfigurationUtils , InitContainerResourceStagingServerSecretPlugin , PodWithDetachedInitContainer , SparkPodInitContainerBootstrap }
@@ -38,8 +40,8 @@ import org.apache.spark.deploy.kubernetes.constants._
38
40
import org .apache .spark .deploy .kubernetes .submit .InitContainerUtil
39
41
import org .apache .spark .network .netty .SparkTransportConf
40
42
import org .apache .spark .network .shuffle .kubernetes .KubernetesExternalShuffleClient
41
- import org .apache .spark .rpc .{RpcCallContext , RpcEndpointAddress , RpcEnv }
42
- import org .apache .spark .scheduler .TaskSchedulerImpl
43
+ import org .apache .spark .rpc .{RpcAddress , RpcCallContext , RpcEndpointAddress , RpcEnv }
44
+ import org .apache .spark .scheduler .{ ExecutorExited , SlaveLost , TaskSchedulerImpl }
43
45
import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RetrieveSparkAppConfig , SparkAppConfig }
44
46
import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
45
47
import org .apache .spark .util .{ThreadUtils , Utils }
@@ -55,10 +57,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
55
57
import KubernetesClusterSchedulerBackend ._
56
58
57
59
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
58
- private val runningExecutorPods = new mutable.HashMap [String , Pod ] // Indexed by executor IDs.
59
-
60
+ // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
61
+ private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
62
+ // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
63
+ private val runningPodsToExecutors = new mutable.HashMap [String , String ]
64
+ // TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map.
60
65
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
61
- private val executorPodsByIPs = new mutable.HashMap [String , Pod ] // Indexed by executor IP addrs.
66
+ // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
67
+ private val executorPodsByIPs = new mutable.HashMap [String , Pod ]
68
+ private val failedPods : concurrent.Map [String , ExecutorExited ] = new
69
+ ConcurrentHashMap [String , ExecutorExited ]().asScala
70
+ private val executorsToRemove = Collections .newSetFromMap[String ](
71
+ new ConcurrentHashMap [String , java.lang.Boolean ]()).asScala
62
72
63
73
private val executorExtraClasspath = conf.get(
64
74
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH )
@@ -135,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
135
145
val parsedShuffleLabels = ConfigurationUtils .parseKeyValuePairs(
136
146
conf.get(KUBERNETES_SHUFFLE_LABELS ), KUBERNETES_SHUFFLE_LABELS .key,
137
147
" shuffle-labels" )
138
- if (parsedShuffleLabels.size == 0 ) {
148
+ if (parsedShuffleLabels.isEmpty ) {
139
149
throw new SparkException (s " Dynamic allocation enabled " +
140
150
s " but no ${KUBERNETES_SHUFFLE_LABELS .key} specified " )
141
151
}
@@ -170,12 +180,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
170
180
private val executorWatchResource = new AtomicReference [Closeable ]
171
181
protected var totalExpectedExecutors = new AtomicInteger (0 )
172
182
183
+
173
184
private val driverUrl = RpcEndpointAddress (
174
185
sc.getConf.get(" spark.driver.host" ),
175
186
sc.getConf.getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
176
187
CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
177
188
178
- private val initialExecutors = getInitialTargetExecutorNumber(1 )
189
+ private val initialExecutors = getInitialTargetExecutorNumber()
179
190
180
191
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY )
181
192
require(podAllocationInterval > 0 , s " Allocation batch delay " +
@@ -192,23 +203,74 @@ private[spark] class KubernetesClusterSchedulerBackend(
192
203
193
204
private val allocatorRunnable : Runnable = new Runnable {
194
205
206
+ // Number of times we are allowed check for the loss reason for an executor before we give up
207
+ // and assume the executor failed for good, and attribute it to a framework fault.
208
+ private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
209
+ private val executorsToRecover = new mutable.HashSet [String ]
210
+ // Maintains a map of executor id to count of checks performed to learn the loss reason
211
+ // for an executor.
212
+ private val executorReasonChecks = new mutable.HashMap [String , Int ]
213
+
195
214
override def run (): Unit = {
196
- if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
197
- logDebug(" Waiting for pending executors before scaling" )
198
- } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
199
- logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
200
- } else {
201
- val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
202
- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
215
+ removeFailedExecutors()
216
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
217
+ if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
218
+ logDebug(" Waiting for pending executors before scaling" )
219
+ } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) {
220
+ logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
221
+ } else {
222
+ val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
203
223
for (i <- 0 until math.min(
204
- totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
205
- runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount)
224
+ totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
225
+ val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
226
+ runningExecutorsToPods.put(executorId, pod)
227
+ runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
206
228
logInfo(
207
- s " Requesting a new executor, total executors is now ${runningExecutorPods .size}" )
229
+ s " Requesting a new executor, total executors is now ${runningExecutorsToPods .size}" )
208
230
}
209
231
}
210
232
}
211
233
}
234
+
235
+ def removeFailedExecutors (): Unit = {
236
+ val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
237
+ runningExecutorsToPods.toMap
238
+ }
239
+ executorsToRemove.foreach { case (executorId) =>
240
+ localRunningExecutorsToPods.get(executorId).map { pod : Pod =>
241
+ failedPods.get(pod.getMetadata.getName).map { executorExited : ExecutorExited =>
242
+ logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
243
+ removeExecutor(executorId, executorExited)
244
+ if (! executorExited.exitCausedByApp) {
245
+ executorsToRecover.add(executorId)
246
+ }
247
+ }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
248
+ }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
249
+
250
+ executorsToRecover.foreach(executorId => {
251
+ executorsToRemove -= executorId
252
+ executorReasonChecks -= executorId
253
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
254
+ runningExecutorsToPods.remove(executorId).map { pod : Pod =>
255
+ kubernetesClient.pods().delete(pod)
256
+ runningPodsToExecutors.remove(pod.getMetadata.getName)
257
+ }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
258
+ }
259
+ })
260
+ executorsToRecover.clear()
261
+ }
262
+ }
263
+
264
+ def removeExecutorOrIncrementLossReasonCheckCount (executorId : String ): Unit = {
265
+ val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0 )
266
+ if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS ) {
267
+ removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons" ))
268
+ executorsToRecover.add(executorId)
269
+ executorReasonChecks -= executorId
270
+ } else {
271
+ executorReasonChecks.put(executorId, reasonCheckCount + 1 )
272
+ }
273
+ }
212
274
}
213
275
214
276
private val objectMapper = new ObjectMapper ().registerModule(DefaultScalaModule )
@@ -280,8 +342,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
280
342
// indication as to why.
281
343
try {
282
344
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
283
- runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
284
- runningExecutorPods.clear()
345
+ runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
346
+ runningExecutorsToPods.clear()
347
+ runningPodsToExecutors.clear()
285
348
}
286
349
EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
287
350
executorPodsByIPs.clear()
@@ -534,11 +597,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
534
597
}
535
598
}
536
599
537
- override def createDriverEndpoint (
538
- properties : Seq [(String , String )]): DriverEndpoint = {
539
- new KubernetesDriverEndpoint (rpcEnv, properties)
540
- }
541
-
542
600
override def doRequestTotalExecutors (requestedTotal : Int ): Future [Boolean ] = Future [Boolean ] {
543
601
totalExpectedExecutors.set(requestedTotal)
544
602
true
@@ -547,8 +605,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
547
605
override def doKillExecutors (executorIds : Seq [String ]): Future [Boolean ] = Future [Boolean ] {
548
606
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
549
607
for (executor <- executorIds) {
550
- runningExecutorPods.remove(executor) match {
551
- case Some (pod) => kubernetesClient.pods().delete(pod)
608
+ runningExecutorsToPods.remove(executor) match {
609
+ case Some (pod) =>
610
+ kubernetesClient.pods().delete(pod)
611
+ runningPodsToExecutors.remove(pod.getMetadata.getName)
552
612
case None => logWarning(s " Unable to remove pod for unknown executor $executor" )
553
613
}
554
614
}
@@ -564,6 +624,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
564
624
565
625
private class ExecutorPodsWatcher extends Watcher [Pod ] {
566
626
627
+ private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = - 1
628
+
567
629
override def eventReceived (action : Action , pod : Pod ): Unit = {
568
630
if (action == Action .MODIFIED && pod.getStatus.getPhase == " Running"
569
631
&& pod.getMetadata.getDeletionTimestamp == null ) {
@@ -583,12 +645,75 @@ private[spark] class KubernetesClusterSchedulerBackend(
583
645
executorPodsByIPs -= podIP
584
646
}
585
647
}
648
+ if (action == Action .ERROR ) {
649
+ logInfo(s " Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
650
+ handleErroredPod(pod)
651
+ } else if (action == Action .DELETED ) {
652
+ logInfo(s " Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
653
+ handleDeletedPod(pod)
654
+ }
586
655
}
587
656
}
588
657
589
658
override def onClose (cause : KubernetesClientException ): Unit = {
590
659
logDebug(" Executor pod watch closed." , cause)
591
660
}
661
+
662
+ def getExecutorExitStatus (pod : Pod ): Int = {
663
+ val containerStatuses = pod.getStatus.getContainerStatuses
664
+ if (! containerStatuses.isEmpty) {
665
+ // we assume the first container represents the pod status. This assumption may not hold
666
+ // true in the future. Revisit this if side-car containers start running inside executor
667
+ // pods.
668
+ getExecutorExitStatus(containerStatuses.get(0 ))
669
+ } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
670
+ }
671
+
672
+ def getExecutorExitStatus (containerStatus : ContainerStatus ): Int = {
673
+ Option (containerStatus.getState).map(containerState =>
674
+ Option (containerState.getTerminated).map(containerStateTerminated =>
675
+ containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE )
676
+ ).getOrElse(UNKNOWN_EXIT_CODE )
677
+ }
678
+
679
+ def isPodAlreadyReleased (pod : Pod ): Boolean = {
680
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
681
+ ! runningPodsToExecutors.contains(pod.getMetadata.getName)
682
+ }
683
+ }
684
+
685
+ def handleErroredPod (pod : Pod ): Unit = {
686
+ val alreadyReleased = isPodAlreadyReleased(pod)
687
+ val containerExitStatus = getExecutorExitStatus(pod)
688
+ // container was probably actively killed by the driver.
689
+ val exitReason = if (alreadyReleased) {
690
+ ExecutorExited (containerExitStatus, exitCausedByApp = false ,
691
+ s " Container in pod " + pod.getMetadata.getName +
692
+ " exited from explicit termination request." )
693
+ } else {
694
+ val containerExitReason = containerExitStatus match {
695
+ case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE =>
696
+ memLimitExceededLogMessage(pod.getStatus.getReason)
697
+ case _ =>
698
+ // Here we can't be sure that that exit was caused by the application but this seems
699
+ // to be the right default since we know the pod was not explicitly deleted by
700
+ // the user.
701
+ " Pod exited with following container exit status code " + containerExitStatus
702
+ }
703
+ ExecutorExited (containerExitStatus, exitCausedByApp = true , containerExitReason)
704
+ }
705
+ failedPods.put(pod.getMetadata.getName, exitReason)
706
+ }
707
+
708
+ def handleDeletedPod (pod : Pod ): Unit = {
709
+ val exitReason = ExecutorExited (getExecutorExitStatus(pod), exitCausedByApp = false ,
710
+ " Pod " + pod.getMetadata.getName + " deleted or lost." )
711
+ failedPods.put(pod.getMetadata.getName, exitReason)
712
+ }
713
+ }
714
+
715
+ override def createDriverEndpoint (properties : Seq [(String , String )]): DriverEndpoint = {
716
+ new KubernetesDriverEndpoint (rpcEnv, properties)
592
717
}
593
718
594
719
private class KubernetesDriverEndpoint (
@@ -597,6 +722,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
597
722
extends DriverEndpoint (rpcEnv, sparkProperties) {
598
723
private val externalShufflePort = conf.getInt(" spark.shuffle.service.port" , 7337 )
599
724
725
+ override def onDisconnected (rpcAddress : RpcAddress ): Unit = {
726
+ addressToExecutorId.get(rpcAddress).foreach { executorId =>
727
+ if (disableExecutor(executorId)) {
728
+ executorsToRemove.add(executorId)
729
+ }
730
+ }
731
+ }
732
+
600
733
override def receiveAndReply (
601
734
context : RpcCallContext ): PartialFunction [Any , Unit ] = {
602
735
new PartialFunction [Any , Unit ]() {
@@ -615,7 +748,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
615
748
var resolvedProperties = sparkProperties
616
749
val runningExecutorPod = kubernetesClient
617
750
.pods()
618
- .withName(runningExecutorPods (executorId).getMetadata.getName)
751
+ .withName(runningExecutorsToPods (executorId).getMetadata.getName)
619
752
.get()
620
753
val nodeName = runningExecutorPod.getSpec.getNodeName
621
754
val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName)
@@ -637,7 +770,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
637
770
}.orElse(super .receiveAndReply(context))
638
771
}
639
772
}
640
-
641
773
}
642
774
case class ShuffleServiceConfig (
643
775
shuffleNamespace : String ,
@@ -647,6 +779,14 @@ case class ShuffleServiceConfig(
647
779
private object KubernetesClusterSchedulerBackend {
648
780
private val DEFAULT_STATIC_PORT = 10000
649
781
private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
782
+ private val VMEM_EXCEEDED_EXIT_CODE = - 103
783
+ private val PMEM_EXCEEDED_EXIT_CODE = - 104
784
+ private val UNKNOWN_EXIT_CODE = - 111
785
+
786
+ def memLimitExceededLogMessage (diagnostics : String ): String = {
787
+ s " Pod/Container killed for exceeding memory limits. $diagnostics" +
788
+ " Consider boosting spark executor memory overhead."
789
+ }
650
790
}
651
791
652
792
/**
0 commit comments