Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 1429d01

Browse files
committed
Fix unit tests
1 parent c8312b3 commit 1429d01

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
427427
podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
428428
}
429429
}
430+
431+
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
432+
new KubernetesDriverEndpoint(rpcEnv, properties)
433+
}
434+
435+
private class KubernetesDriverEndpoint(
436+
rpcEnv: RpcEnv,
437+
sparkProperties: Seq[(String, String)])
438+
extends DriverEndpoint(rpcEnv, sparkProperties) {
439+
440+
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
441+
addressToExecutorId.get(rpcAddress).foreach { executorId =>
442+
if (disableExecutor(executorId)) {
443+
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
444+
runningExecutorsToPods.get(executorId).foreach { pod =>
445+
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
446+
}
447+
}
448+
}
449+
}
450+
}
451+
}
430452
}
431453

432454
private object KubernetesClusterSchedulerBackend {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,23 @@ package org.apache.spark.scheduler.cluster.kubernetes
1818

1919
import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
2020

21+
import scala.collection.JavaConverters._
22+
import scala.concurrent.Future
23+
2124
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
2225
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
2326
import io.fabric8.kubernetes.client.Watcher.Action
2427
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
2528
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
2629
import org.mockito.Matchers.{any, eq => mockitoEq}
27-
import org.mockito.Mockito.{doNothing, never, times, verify, when}
30+
import org.mockito.Mockito.{mock => _, _}
2831
import org.scalatest.BeforeAndAfter
2932
import org.scalatest.mock.MockitoSugar._
30-
import scala.collection.JavaConverters._
31-
import scala.concurrent.Future
3233

3334
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
3435
import org.apache.spark.deploy.kubernetes.config._
3536
import org.apache.spark.deploy.kubernetes.constants._
36-
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
37+
import org.apache.spark.rpc._
3738
import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
3839
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
3940
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -174,7 +175,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
174175
}
175176

176177
test("Basic lifecycle expectations when starting and stopping the scheduler.") {
177-
val scheduler = newSchedulerBackend(true)
178+
val scheduler = newSchedulerBackend()
178179
scheduler.start()
179180
assert(executorPodsWatcherArgument.getValue != null)
180181
assert(allocatorRunnable.getValue != null)
@@ -186,7 +187,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
186187
sparkConf
187188
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
188189
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
189-
val scheduler = newSchedulerBackend(true)
190+
val scheduler = newSchedulerBackend()
190191
scheduler.start()
191192
requestExecutorRunnable.getValue.run()
192193
expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
@@ -201,7 +202,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
201202
sparkConf
202203
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
203204
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
204-
val scheduler = newSchedulerBackend(true)
205+
val scheduler = newSchedulerBackend()
205206
scheduler.start()
206207
requestExecutorRunnable.getValue.run()
207208
expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
@@ -219,7 +220,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
219220
sparkConf
220221
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
221222
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
222-
val scheduler = newSchedulerBackend(true)
223+
val scheduler = newSchedulerBackend()
223224
scheduler.start()
224225
requestExecutorRunnable.getValue.run()
225226
when(podOperations.create(any(classOf[Pod])))
@@ -243,7 +244,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
243244
sparkConf
244245
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
245246
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
246-
val scheduler = newSchedulerBackend(true)
247+
val scheduler = newSchedulerBackend()
247248
scheduler.start()
248249
requestExecutorRunnable.getValue.run()
249250
when(podOperations.create(any(classOf[Pod])))
@@ -280,7 +281,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
280281
sparkConf
281282
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
282283
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
283-
val scheduler = newSchedulerBackend(true)
284+
val scheduler = newSchedulerBackend()
284285
scheduler.start()
285286
expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
286287
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
@@ -315,7 +316,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
315316
sparkConf
316317
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
317318
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
318-
val scheduler = newSchedulerBackend(true)
319+
val scheduler = newSchedulerBackend()
319320
scheduler.start()
320321
expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
321322
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
@@ -340,7 +341,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
340341
RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
341342
}
342343

343-
private def newSchedulerBackend(externalShuffle: Boolean): KubernetesClusterSchedulerBackend = {
344+
private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
344345
new KubernetesClusterSchedulerBackend(
345346
taskSchedulerImpl,
346347
rpcEnv,

0 commit comments

Comments
 (0)