Skip to content

Commit 302b5fa

Browse files
committed
[KYUUBI #7101] Load the existing pods when initializing kubernetes client to cleanup terminated app pods
### Why are the changes needed? To prevent the terminated app pods leak if the events missed during kyuubi server restart. ### How was this patch tested? Manual test. ``` :2025-06-17 17:50:37.275 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app state FINISHED, marking it as terminated 2025-06-17 17:50:37.278 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app state FINISHED, marking it as terminated ``` The pods are cleaned up eventually. <img width="664" alt="image" src="https://github.com/user-attachments/assets/8cf58f61-065f-4fb0-9718-2e3c00e8d2e0" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7101 from turboFei/pod_cleanup. Closes #7101 7f76cf5 [Wang, Fei] async 11c9db2 [Wang, Fei] cleanup Authored-by: Wang, Fei <fwang12@ebay.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
1 parent 364f062 commit 302b5fa

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
7575

7676
private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
7777

78+
private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _
79+
7880
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
7981
checkKubernetesInfo(kubernetesInfo)
80-
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
82+
kubernetesClients.computeIfAbsent(
83+
kubernetesInfo,
84+
kInfo => {
85+
val kubernetesClient = buildKubernetesClient(kInfo)
86+
cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, kubernetesClient)
87+
kubernetesClient
88+
})
89+
}
90+
91+
private def cleanTerminatedAppPodsOnKubernetesClientInitialize(
92+
kubernetesInfo: KubernetesInfo,
93+
kubernetesClient: KubernetesClient): Unit = {
94+
if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
95+
kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new Runnable {
96+
override def run(): Unit = {
97+
val existingPods =
98+
kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems
99+
info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods with label " +
100+
s"$LABEL_KYUUBI_UNIQUE_KEY")
101+
val eventType = KubernetesResourceEventTypes.UPDATE
102+
existingPods.asScala.filter(isSparkEnginePod).foreach { pod =>
103+
val appState = toApplicationState(pod, appStateSource, appStateContainer, eventType)
104+
if (isTerminated(appState)) {
105+
val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
106+
info(s"[$kubernetesInfo] Found existing pod ${pod.getMetadata.getName} with " +
107+
s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking it as terminated")
108+
if (appInfoStore.get(kyuubiUniqueKey) == null) {
109+
updateApplicationState(kubernetesInfo, pod, eventType)
110+
}
111+
markApplicationTerminated(kubernetesInfo, pod, eventType)
112+
}
113+
}
114+
}
115+
})
116+
}
81117
}
82118

83119
private var metadataManager: Option[MetadataManager] = _
@@ -168,6 +204,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
168204
TimeUnit.MILLISECONDS)
169205
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
170206
"cleanup-canceled-app-pod-thread")
207+
kubernetesClientInitializeCleanupTerminatedPodExecutor =
208+
ThreadUtils.newDaemonCachedThreadPool(
209+
"kubernetes-client-initialize-cleanup-terminated-pod-thread")
171210
initializeKubernetesClient(kyuubiConf)
172211
}
173212

@@ -321,6 +360,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
321360
ThreadUtils.shutdown(cleanupCanceledAppPodExecutor)
322361
cleanupCanceledAppPodExecutor = null
323362
}
363+
364+
if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
365+
ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor)
366+
kubernetesClientInitializeCleanupTerminatedPodExecutor = null
367+
}
324368
}
325369

326370
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)

0 commit comments

Comments
 (0)