@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
25
25
import io .fabric8 .kubernetes .api .model ._
26
26
import io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
27
27
import io .fabric8 .kubernetes .client .Watcher .Action
28
- import scala .collection .{ concurrent , mutable }
28
+ import scala .collection .mutable
29
29
import scala .collection .JavaConverters ._
30
30
import scala .concurrent .{ExecutionContext , Future }
31
31
@@ -56,14 +56,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
56
56
private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
57
57
// Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
58
58
private val runningPodsToExecutors = new mutable.HashMap [String , String ]
59
- // TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map.
60
- private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
61
- // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
62
- private val executorPodsByIPs = new mutable.HashMap [String , Pod ]
63
- private val podsWithKnownExitReasons : concurrent.Map [String , ExecutorExited ] =
64
- new ConcurrentHashMap [String , ExecutorExited ]().asScala
65
- private val disconnectedPodsByExecutorIdPendingRemoval =
66
- new ConcurrentHashMap [String , Pod ]().asScala
59
+ private val executorPodsByIPs = new ConcurrentHashMap [String , Pod ]()
60
+ private val podsWithKnownExitReasons = new ConcurrentHashMap [String , ExecutorExited ]()
61
+ private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap [String , Pod ]()
67
62
68
63
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE )
69
64
@@ -102,13 +97,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
102
97
103
98
private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY )
104
99
require(podAllocationInterval > 0 , s " Allocation batch delay " +
105
- s " ${ KUBERNETES_ALLOCATION_BATCH_DELAY } " +
106
- s " is ${ podAllocationInterval} , should be a positive integer " )
100
+ s " $KUBERNETES_ALLOCATION_BATCH_DELAY " +
101
+ s " is $podAllocationInterval, should be a positive integer " )
107
102
108
103
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE )
109
104
require(podAllocationSize > 0 , s " Allocation batch size " +
110
- s " ${ KUBERNETES_ALLOCATION_BATCH_SIZE } " +
111
- s " is ${ podAllocationSize} , should be a positive integer " )
105
+ s " $KUBERNETES_ALLOCATION_BATCH_SIZE " +
106
+ s " is $podAllocationSize, should be a positive integer " )
112
107
113
108
private val allocatorRunnable = new Runnable {
114
109
@@ -141,10 +136,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
141
136
// For each disconnected executor, synchronize with the loss reasons that may have been found
142
137
// by the executor pod watcher. If the loss reason was discovered by the watcher,
143
138
// inform the parent class with removeExecutor.
144
- val disconnectedPodsByExecutorIdPendingRemovalCopy =
145
- Map .empty ++ disconnectedPodsByExecutorIdPendingRemoval
146
- disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) =>
147
- val knownExitReason = podsWithKnownExitReasons.remove( executorPod.getMetadata.getName)
139
+ disconnectedPodsByExecutorIdPendingRemoval.keys().asScala.foreach { case (executorId) =>
140
+ val executorPod = disconnectedPodsByExecutorIdPendingRemoval.get(executorId)
141
+ val knownExitReason = Option (podsWithKnownExitReasons.remove(
142
+ executorPod.getMetadata.getName) )
148
143
knownExitReason.fold {
149
144
removeExecutorOrIncrementLossReasonCheckCount(executorId)
150
145
} { executorExited =>
@@ -171,7 +166,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
171
166
}
172
167
173
168
def deleteExecutorFromClusterAndDataStructures (executorId : String ): Unit = {
174
- disconnectedPodsByExecutorIdPendingRemoval -= executorId
169
+ disconnectedPodsByExecutorIdPendingRemoval.remove( executorId)
175
170
executorReasonCheckAttemptCounts -= executorId
176
171
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
177
172
runningExecutorsToPods.remove(executorId).map { pod =>
@@ -239,9 +234,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
239
234
runningExecutorsToPods.clear()
240
235
runningPodsToExecutors.clear()
241
236
}
242
- EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
243
- executorPodsByIPs.clear()
244
- }
237
+ executorPodsByIPs.clear()
245
238
val resource = executorWatchResource.getAndSet(null )
246
239
if (resource != null ) {
247
240
resource.close()
@@ -262,14 +255,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
262
255
* locality if an executor launches on the cluster node.
263
256
*/
264
257
private def getNodesWithLocalTaskCounts () : Map [String , Int ] = {
265
- val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
266
- executorPodsByIPs.values.toList // toList makes a defensive copy.
267
- }
268
258
val nodeToLocalTaskCount = mutable.Map [String , Int ]() ++
269
259
KubernetesClusterSchedulerBackend .this .synchronized {
270
260
hostToLocalTaskCount
271
261
}
272
- for (pod <- executorPodsWithIPs ) {
262
+ for (pod <- executorPodsByIPs.values().asScala ) {
273
263
// Remove cluster nodes that are running our executors already.
274
264
// TODO: This prefers spreading out executors across nodes. In case users want
275
265
// consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
@@ -319,7 +309,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
319
309
val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
320
310
maybeRemovedExecutor.foreach { executorPod =>
321
311
kubernetesClient.pods().delete(executorPod)
322
- disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod
312
+ disconnectedPodsByExecutorIdPendingRemoval.put (executor, executorPod)
323
313
runningPodsToExecutors.remove(executorPod.getMetadata.getName)
324
314
}
325
315
if (maybeRemovedExecutor.isEmpty) {
@@ -331,9 +321,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
331
321
}
332
322
333
323
def getExecutorPodByIP (podIP : String ): Option [Pod ] = {
334
- EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
335
- executorPodsByIPs.get(podIP)
336
- }
324
+ // Note: Per https://github.com/databricks/scala-style-guide#concurrency, we don't
325
+ // want to be switching to scala.collection.concurrent.Map on
326
+ // executorPodsByIPs.
327
+ val pod = executorPodsByIPs.get(podIP)
328
+ Option (pod)
337
329
}
338
330
339
331
private class ExecutorPodsWatcher extends Watcher [Pod ] {
@@ -346,18 +338,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
346
338
val podIP = pod.getStatus.getPodIP
347
339
val clusterNodeName = pod.getSpec.getNodeName
348
340
logDebug(s " Executor pod $pod ready, launched at $clusterNodeName as IP $podIP. " )
349
- EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
350
- executorPodsByIPs += ((podIP, pod))
351
- }
341
+ executorPodsByIPs.put(podIP, pod)
352
342
} else if ((action == Action .MODIFIED && pod.getMetadata.getDeletionTimestamp != null ) ||
353
343
action == Action .DELETED || action == Action .ERROR ) {
354
344
val podName = pod.getMetadata.getName
355
345
val podIP = pod.getStatus.getPodIP
356
346
logDebug(s " Executor pod $podName at IP $podIP was at $action. " )
357
347
if (podIP != null ) {
358
- EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
359
- executorPodsByIPs -= podIP
360
- }
348
+ executorPodsByIPs.remove(podIP)
361
349
}
362
350
if (action == Action .ERROR ) {
363
351
logInfo(s " Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
@@ -445,7 +433,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
445
433
if (disableExecutor(executorId)) {
446
434
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
447
435
runningExecutorsToPods.get(executorId).foreach { pod =>
448
- disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod
436
+ disconnectedPodsByExecutorIdPendingRemoval.put (executorId, pod)
449
437
}
450
438
}
451
439
}
0 commit comments