Skip to content

Commit f43d5b9

Browse files
committed
Merged branch k8s-executor-resources
Squashed commit of the following: commit 253082b86748476a70415698514a8a897ca793f5 Author: Enrico Minack <github@enrico.minack.dev> Date: Tue Feb 17 16:46:00 2026 +0100 Allow executor feature steps to create resources
1 parent d25b191 commit f43d5b9

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ class ExecutorPodsAllocator(
475475
if (driverResources.nonEmpty && driverPod.nonEmpty) {
476476
addOwnerReference(driverPod.get, driverResources)
477477
}
478-
kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply()
478+
Option(kubernetesClient.resourceList(resources: _*))
479+
.map(_.forceConflicts().serverSideApply())
479480
resources
480481
.filter(_.getKind == "PersistentVolumeClaim")
481482
.foreach { resource =>

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,36 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
781781
assert(!podsAllocatorUnderTest.isDeleted("7"))
782782
}
783783

784+
test("executor feature steps can create resources") {
785+
val service = new ServiceBuilder()
786+
.withNewMetadata()
787+
.withName("servicename")
788+
.endMetadata()
789+
.build()
790+
791+
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
792+
// have the feature step define a kubernetes service (resource)
793+
meq(kubernetesClient), any(classOf[ResourceProfile])))
794+
.thenAnswer((invocation: InvocationOnMock) => {
795+
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
796+
KubernetesExecutorSpec(
797+
executorPodWithId(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
798+
Seq(service))
799+
})
800+
801+
val startTime = Instant.now.toEpochMilli
802+
waitForExecutorPodsClock.setTime(startTime)
803+
804+
// Scale up to one executor
805+
podsAllocatorUnderTest.setTotalExpectedExecutors(
806+
Map(defaultProfile -> 1))
807+
assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1)
808+
verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
809+
810+
// service is considered for creation
811+
verify(kubernetesClient, times(1)).resourceList(meq(service))
812+
}
813+
784814
test("executor feature steps resources ownership") {
785815
val executorMetadata = mock[ObjectMeta]
786816
when(executorMetadata.getName).thenReturn("executor-name")

0 commit comments

Comments
 (0)