33import os
44from datetime import datetime , timedelta , timezone
55from pathlib import Path
6- from typing import Any , Dict , List , Set
6+ from typing import Any , Dict , List , Optional , Set
77
88from app .config import get_settings
99from app .core .logging import logger
10+ from app .runtime_registry import RUNTIME_REGISTRY
1011from app .services .circuit_breaker import CircuitBreaker
1112from app .services .pod_manifest_builder import PodManifestBuilder
1213from fastapi import Depends , Request
@@ -58,9 +59,16 @@ class KubernetesService:
5859 HEALTH_CHECK_INTERVAL = 60
5960 CONTAINER_KUBECONFIG_PATH = "/app/kubeconfig.yaml"
6061
62+ v1 : Optional [k8s_client .CoreV1Api ]
63+ apps_v1 : Optional [k8s_client .AppsV1Api ]
64+ version_api : Optional [k8s_client .VersionApi ]
65+
6166 def __init__ (self , manager : KubernetesServiceManager ):
6267 self .settings = get_settings ()
6368 self .manager = manager
69+ self .v1 = None
70+ self .apps_v1 = None
71+ self .version_api = None
6472 self ._initialize_kubernetes_client ()
6573
6674 self .circuit_breaker = CircuitBreaker ()
@@ -74,6 +82,10 @@ def __del__(self) -> None:
7482 self .manager .unregister (self )
7583
7684 async def check_health (self ) -> bool :
85+ if not self .version_api :
86+ logger .warning ("Kubernetes client not available for health check." )
87+ self ._is_healthy = False
88+ return False
7789 try :
7890 if (datetime .now (timezone .utc ) - self ._last_health_check ).seconds < self .HEALTH_CHECK_INTERVAL :
7991 return self ._is_healthy
@@ -91,7 +103,6 @@ async def check_health(self) -> bool:
91103
92104 async def graceful_shutdown (self ) -> None :
93105 shutdown_deadline = datetime .now (timezone .utc ) + timedelta (seconds = self .SHUTDOWN_TIMEOUT )
94- # Make a copy of keys to avoid modification during iteration issues
95106 for pod_name in list (self ._active_pods .keys ()):
96107 if datetime .now (timezone .utc ) > shutdown_deadline :
97108 logger .warning ("Shutdown timeout reached, forcing pod termination" )
@@ -109,12 +120,14 @@ def _initialize_kubernetes_client(self) -> None:
109120 try :
110121 self ._setup_kubernetes_config ()
111122 self .v1 = k8s_client .CoreV1Api ()
123+ self .apps_v1 = k8s_client .AppsV1Api ()
112124 self .version_api = k8s_client .VersionApi ()
113125 self ._test_api_connection ()
114126 logger .info ("Kubernetes client initialized successfully." )
115127 except Exception as e :
116128 logger .error (f"Failed to initialize Kubernetes client: { str (e )} " )
117129 self .v1 = None
130+ self .apps_v1 = None
118131 self .version_api = None
119132 raise KubernetesConfigError (f"Failed to initialize Kubernetes client: { str (e )} " ) from e
120133
@@ -137,6 +150,8 @@ def _setup_kubernetes_config(self) -> None:
137150 logger .info (f"Kubernetes client configured for host: { default_config .host } " )
138151
139152 def _test_api_connection (self ) -> None :
153+ if not self .version_api :
154+ raise KubernetesConfigError ("VersionAPI client not initialized." )
140155 try :
141156 version = self .version_api .get_code ()
142157 logger .info (f"Successfully connected to Kubernetes API. Server version: { version .git_version } " )
@@ -183,6 +198,7 @@ async def create_execution_pod(
183198 pod_memory_limit = self .settings .K8S_POD_MEMORY_LIMIT ,
184199 pod_memory_request = self .settings .K8S_POD_MEMORY_REQUEST ,
185200 pod_execution_timeout = self .settings .K8S_POD_EXECUTION_TIMEOUT ,
201+ priority_class_name = self .settings .K8S_POD_PRIORITY_CLASS_NAME ,
186202 )
187203 pod_manifest = builder .build ()
188204 await self ._create_namespaced_pod (pod_manifest )
@@ -207,7 +223,6 @@ async def get_pod_logs(self, execution_id: str) -> tuple[dict, str]:
207223 logger .info (f"Raw logs from pod { pod_name } :\n ---\n { full_logs } \n ---" )
208224
209225 try :
210- # https://stackoverflow.com/questions/15197673/using-pythons-eval-vs-ast-literal-eval
211226 metrics = ast .literal_eval (full_logs )
212227 return metrics , pod_phase
213228 except (ValueError , SyntaxError , TypeError ) as e :
@@ -226,6 +241,8 @@ async def get_pod_logs(self, execution_id: str) -> tuple[dict, str]:
226241 self ._active_pods .pop (execution_id , None )
227242
228243 async def _wait_for_pod_completion (self , pod_name : str ) -> k8s_client .V1Pod :
244+ if not self .v1 :
245+ raise KubernetesServiceError (_K8S_CLIENT_NOT_INITIALIZED_MSG )
229246 logger .info (f"Waiting for pod '{ pod_name } ' to complete..." )
230247 for _ in range (self .POD_RETRY_ATTEMPTS ):
231248 try :
@@ -242,6 +259,8 @@ async def _wait_for_pod_completion(self, pod_name: str) -> k8s_client.V1Pod:
242259 raise KubernetesPodError (f"Timeout waiting for pod '{ pod_name } ' to complete." )
243260
244261 async def _get_container_logs (self , pod_name : str , container_name : str ) -> str :
262+ if not self .v1 :
263+ return f"Error: { _K8S_CLIENT_NOT_INITIALIZED_MSG } "
245264 try :
246265 return await asyncio .to_thread (
247266 self .v1 .read_namespaced_pod_log ,
@@ -254,6 +273,8 @@ async def _get_container_logs(self, pod_name: str, container_name: str) -> str:
254273 return f"Error retrieving logs: { e .reason } "
255274
256275 async def _create_config_map (self , config_map : k8s_client .V1ConfigMap ) -> None :
276+ if not self .v1 :
277+ raise KubernetesServiceError (_K8S_CLIENT_NOT_INITIALIZED_MSG )
257278 try :
258279 await asyncio .to_thread (self .v1 .create_namespaced_config_map , namespace = self .NAMESPACE , body = config_map )
259280 logger .info (f"ConfigMap '{ config_map .metadata .name } ' created successfully." )
@@ -262,6 +283,8 @@ async def _create_config_map(self, config_map: k8s_client.V1ConfigMap) -> None:
262283 raise KubernetesServiceError (f"Failed to create ConfigMap: { str (e )} " ) from e
263284
264285 async def _create_namespaced_pod (self , pod_manifest : Dict [str , Any ]) -> None :
286+ if not self .v1 :
287+ raise KubernetesPodError (_K8S_CLIENT_NOT_INITIALIZED_MSG )
265288 pod_name = pod_manifest .get ("metadata" , {}).get ("name" , "unknown-pod" )
266289 try :
267290 await asyncio .to_thread (self .v1 .create_namespaced_pod , body = pod_manifest , namespace = self .NAMESPACE )
@@ -286,6 +309,80 @@ async def _cleanup_resources(self, pod_name: str, config_map_name: str) -> None:
286309 except ApiException as e :
287310 logger .error (f"Failed to delete config map '{ config_map_name } ': { e .reason } " )
288311
312+ # DaemonSet: https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/
313+ async def ensure_image_pre_puller_daemonset (self ) -> None :
314+ if not self .apps_v1 :
315+ logger .warning ("Kubernetes AppsV1Api client not initialized. Skipping DaemonSet creation." )
316+ return
317+
318+ daemonset_name = "runtime-image-pre-puller"
319+ namespace = self .NAMESPACE
320+ await asyncio .sleep (5 )
321+
322+ try :
323+ init_containers = []
324+ all_images = {
325+ config .image
326+ for lang in RUNTIME_REGISTRY .values ()
327+ for config in lang .values ()
328+ }
329+
330+ for i , image_ref in enumerate (sorted (list (all_images ))):
331+ sanitized_image_ref = image_ref .split ('/' )[- 1 ].replace (':' , '-' ).replace ('.' , '-' ).replace ('_' , '-' )
332+ logger .info (f"DAEMONSET: before: { image_ref } -> { sanitized_image_ref } " )
333+ container_name = f"pull-{ i } -{ sanitized_image_ref } "
334+ init_containers .append ({
335+ "name" : container_name ,
336+ "image" : image_ref ,
337+ "command" : ["/bin/sh" , "-c" , f'echo "Image { image_ref } pulled."' ],
338+ "imagePullPolicy" : "Always" ,
339+ })
340+
341+ manifest : Dict [str , Any ] = {
342+ "apiVersion" : "apps/v1" ,
343+ "kind" : "DaemonSet" ,
344+ "metadata" : {"name" : daemonset_name , "namespace" : namespace },
345+ "spec" : {
346+ "selector" : {"matchLabels" : {"name" : daemonset_name }},
347+ "template" : {
348+ "metadata" : {"labels" : {"name" : daemonset_name }},
349+ "spec" : {
350+ "initContainers" : init_containers ,
351+ "containers" : [{
352+ "name" : "pause" ,
353+ "image" : "registry.k8s.io/pause:3.9"
354+ }],
355+ "tolerations" : [{"operator" : "Exists" }]
356+ }
357+ },
358+ "updateStrategy" : {"type" : "RollingUpdate" }
359+ }
360+ }
361+
362+ try :
363+ await asyncio .to_thread (self .apps_v1 .read_namespaced_daemon_set , name = daemonset_name ,
364+ namespace = namespace )
365+ logger .info (f"DaemonSet '{ daemonset_name } ' exists. Replacing to ensure it is up-to-date." )
366+ await asyncio .to_thread (
367+ self .apps_v1 .replace_namespaced_daemon_set ,
368+ name = daemonset_name , namespace = namespace , body = manifest
369+ )
370+ logger .info (f"DaemonSet '{ daemonset_name } ' replaced successfully." )
371+ except ApiException as e :
372+ if e .status == 404 :
373+ logger .info (f"DaemonSet '{ daemonset_name } ' not found. Creating..." )
374+ await asyncio .to_thread (
375+ self .apps_v1 .create_namespaced_daemon_set , namespace = namespace , body = manifest
376+ )
377+ logger .info (f"DaemonSet '{ daemonset_name } ' created successfully." )
378+ else :
379+ raise
380+
381+ except ApiException as e :
382+ logger .error (f"K8s API error applying DaemonSet '{ daemonset_name } ': { e .reason } " , exc_info = True )
383+ except Exception as e :
384+ logger .error (f"Unexpected error applying image-puller DaemonSet: { e } " , exc_info = True )
385+
289386
290387def get_k8s_manager (request : Request ) -> KubernetesServiceManager :
291388 if not hasattr (request .app .state , "k8s_manager" ):
0 commit comments