88from app .core .logging import logger
99from app .core .metrics import (
1010 ACTIVE_EXECUTIONS ,
11+ CPU_UTILIZATION ,
1112 ERROR_COUNTER ,
1213 EXECUTION_DURATION ,
1314 MEMORY_USAGE ,
15+ MEMORY_UTILIZATION_PERCENT ,
16+ POD_CREATION_FAILURES ,
17+ QUEUE_DEPTH ,
18+ QUEUE_WAIT_TIME ,
1419 SCRIPT_EXECUTIONS ,
1520)
1621from app .db .repositories .execution_repository import (
2530 get_kubernetes_service ,
2631)
2732from fastapi import Depends
28- from kubernetes .client .rest import ApiException
2933
3034
3135class ExecutionStatus (str , Enum ):
@@ -53,8 +57,6 @@ async def get_k8s_resource_limits(self) -> Dict[str, Any]:
5357 "supported_runtimes" : self .settings .SUPPORTED_RUNTIMES ,
5458 }
5559
56- # for whatever reason mypy is dumb and can't defer type of EXAMPLE_SCRIPTS
57- # -> ignoring type
5860 async def get_example_scripts (self ) -> Dict [str , str ]:
5961 return self .settings .EXAMPLE_SCRIPTS # type: ignore
6062
@@ -89,11 +91,11 @@ async def _mark_running_when_scheduled(
8991 )
9092
9193 async def _start_k8s_execution (
92- self ,
93- execution_id_str : str ,
94- script : str ,
95- lang : str ,
96- lang_version : str ,
94+ self ,
95+ execution_id_str : str ,
96+ script : str ,
97+ lang : str ,
98+ lang_version : str ,
9799 ) -> None :
98100 """
99101 1. Ask KubernetesService to create the Pod.
@@ -111,7 +113,6 @@ async def _start_k8s_execution(
111113 config_map_data = {runtime_cfg .file_name : script },
112114 )
113115
114- # Start background poller ⤵ — we do **not** await it here
115116 asyncio .create_task (
116117 self ._mark_running_when_scheduled (pod_name , execution_id_str )
117118 )
@@ -123,6 +124,9 @@ async def _start_k8s_execution(
123124
124125 except Exception as e :
125126 logger .error (f"Failed to request K8s pod creation: { str (e )} " , exc_info = True )
127+
128+ POD_CREATION_FAILURES .labels (failure_reason = type (e ).__name__ ).inc ()
129+
126130 await self .execution_repo .update_execution (
127131 execution_id_str ,
128132 ExecutionUpdate (
@@ -132,34 +136,6 @@ async def _start_k8s_execution(
132136 )
133137 raise IntegrationException (status_code = 500 , detail = "Container creation failed" ) from e
134138
135- async def _get_k8s_execution_output (
136- self ,
137- execution_id_str : str
138- ) -> tuple [Optional [str ], Optional [str ], Optional [str ], Optional [dict ]]:
139- output : Optional [str ] = None
140- error_msg : Optional [str ] = None
141- # assume error unless the try succeeds
142- phase : Optional [str ] = ExecutionStatus .ERROR
143- resource_usage : Optional [dict ] = None
144-
145- try :
146- output , phase , resource_usage = await self .k8s_service .get_pod_logs (execution_id_str )
147- logger .info (
148- f"Retrieved K8s results for { execution_id_str } . "
149- f"Phase: { phase } . Resource usage found: { resource_usage is not None } "
150- )
151- except KubernetesPodError as e :
152- error_msg = str (e )
153- logger .error (f"Error retrieving pod results for { execution_id_str } : { error_msg } " )
154- except ApiException as e :
155- error_msg = f"Kubernetes API error for { execution_id_str } : { e .status } { e .reason } "
156- logger .error (error_msg , exc_info = True )
157- except Exception as e :
158- error_msg = f"Unexpected error retrieving K8s results for { execution_id_str } : { e } "
159- logger .error (error_msg , exc_info = True )
160-
161- return output , error_msg , phase , resource_usage
162-
163139 async def _try_finalize_execution (self , execution : ExecutionInDB ) -> Optional [ExecutionInDB ]:
164140 try :
165141 metrics , final_phase = await self .k8s_service .get_pod_logs (execution .id )
@@ -175,33 +151,35 @@ async def _try_finalize_execution(self, execution: ExecutionInDB) -> Optional[Ex
175151 logger .info (f"Successfully parsed metrics from pod: { metrics } " )
176152
177153 exit_code = metrics .get ("exit_code" )
178- res_usage = metrics .get ("resource_usage" )
179-
180- if not res_usage :
181- return None # waiting for results
154+ res_usage = metrics .get ("resource_usage" , {})
182155
183- wall_s = res_usage .get ("execution_time_wall_seconds" ) or 0
156+ wall_s = res_usage .get ("execution_time_wall_seconds" ) or 0.0
184157 jiffies = float (res_usage .get ("cpu_time_jiffies" , 0 ) or 0 )
185158 hertz = float (res_usage .get ("clk_tck_hertz" , 100 ) or 100 )
186- cpu_s = jiffies / hertz if hertz > 0 else 0.0 # total CPU-time
159+ cpu_s = jiffies / hertz if hertz > 0 else 0.0
187160
188- # average CPU in millicores: (CPU-seconds / wall-seconds) × 1000
189161 cpu_millicores = (cpu_s / wall_s * 1000 ) if wall_s else 0.0
190-
191- # VmHWM is k*ibi*bytes → MiB = KiB / 1024
192162 peak_kib = float (res_usage .get ("peak_memory_kb" , 0 ) or 0 )
193163 peak_mib = peak_kib / 1024.0
194164
195- MEMORY_USAGE .labels (lang_and_version = execution .lang + "-" + execution .lang_version ).set (
196- peak_mib * 1024 * 1024 )
165+ lang_and_version : str = f"{ execution .lang } -{ execution .lang_version } "
166+
167+ EXECUTION_DURATION .labels (lang_and_version = lang_and_version ).observe (wall_s )
168+ MEMORY_USAGE .labels (lang_and_version = lang_and_version ).set (peak_mib * 1024 * 1024 )
169+ CPU_UTILIZATION .labels (lang_and_version = lang_and_version ).set (cpu_millicores )
170+
171+ # in settings, pod limit is a string of type <digits><2 letters for unit>
172+ memory_limit_mib = float (self .settings .K8S_POD_MEMORY_LIMIT [:- 2 ])
173+ mem_util_pct = (peak_mib / memory_limit_mib ) * 100
174+ MEMORY_UTILIZATION_PERCENT .labels (lang_and_version = lang_and_version ).set (mem_util_pct )
197175
198176 final_resource_usage = {
199177 "execution_time" : round (wall_s , 6 ),
200178 "cpu_usage" : round (cpu_millicores , 2 ),
201179 "memory_usage" : round (peak_mib , 2 ),
180+ "pod_phase" : final_phase ,
202181 }
203182
204- final_resource_usage ["pod_phase" ] = final_phase
205183 status : ExecutionStatus = ExecutionStatus .COMPLETED if exit_code == 0 else ExecutionStatus .ERROR
206184 if status == ExecutionStatus .ERROR :
207185 ERROR_COUNTER .labels (error_type = "NonZeroExitCode" ).inc ()
@@ -223,8 +201,8 @@ async def _try_finalize_execution(self, execution: ExecutionInDB) -> Optional[Ex
223201 raise IntegrationException (status_code = 500 , detail = "Failed to retrieve execution after update." )
224202
225203 status_label = "success" if updated_execution .status == ExecutionStatus .COMPLETED else "error"
226- SCRIPT_EXECUTIONS . labels ( status = status_label ,
227- lang_and_version = execution . lang + "-" + execution . lang_version ).inc ()
204+ lang_version_label = f" { updated_execution . lang } - { updated_execution . lang_version } "
205+ SCRIPT_EXECUTIONS . labels ( status = status_label , lang_and_version = lang_version_label ).inc ()
228206 if status_label == "error" :
229207 ERROR_COUNTER .labels (error_type = "ScriptExecutionError" ).inc ()
230208
@@ -236,33 +214,37 @@ async def execute_script(
236214 lang_version : str = "3.11"
237215 ) -> ExecutionInDB :
238216 ACTIVE_EXECUTIONS .inc ()
239- start_time = time ()
217+ QUEUE_DEPTH . inc ()
240218 inserted_oid = None
219+ lang_and_version = f"{ lang } -{ lang_version } "
220+
221+ start_time = time ()
241222
242223 try :
243- if lang not in self .settings .SUPPORTED_RUNTIMES . keys () :
224+ if lang not in self .settings .SUPPORTED_RUNTIMES :
244225 raise IntegrationException (status_code = 400 , detail = f"Language '{ lang } ' not supported." )
245226
246227 if lang_version not in self .settings .SUPPORTED_RUNTIMES [lang ]:
247228 raise IntegrationException (status_code = 400 , detail = f"Language version '{ lang_version } ' not supported." )
248229
249230 execution_create = ExecutionCreate (
250- script = script ,
251- lang = lang ,
252- lang_version = lang_version ,
253- status = ExecutionStatus .QUEUED ,
231+ script = script , lang = lang , lang_version = lang_version , status = ExecutionStatus .QUEUED
254232 )
255233 execution_to_insert = ExecutionInDB (** execution_create .model_dump ())
256234 inserted_oid = await self .execution_repo .create_execution (execution_to_insert )
257235 logger .info (f"Created execution record { inserted_oid } with status QUEUED." )
258236
259237 await self ._start_k8s_execution (
260- execution_id_str = inserted_oid , script = script ,
261- lang = lang , lang_version = lang_version
238+ execution_id_str = str (inserted_oid ), script = script , lang = lang , lang_version = lang_version
262239 )
240+
241+ queue_wait_duration = time () - start_time
242+ QUEUE_WAIT_TIME .labels (lang_and_version = lang_and_version ).observe (queue_wait_duration )
243+
244+ # Allow a brief moment for the background poller to potentially catch the Running state
263245 await asyncio .sleep (0.1 )
264246
265- final_execution_state = await self .execution_repo .get_execution (inserted_oid )
247+ final_execution_state = await self .execution_repo .get_execution (str ( inserted_oid ) )
266248 if not final_execution_state :
267249 raise IntegrationException (status_code = 500 , detail = "Failed to retrieve execution record after creation" )
268250 return final_execution_state
@@ -273,14 +255,16 @@ async def execute_script(
273255 if inserted_oid :
274256 await self .execution_repo .update_execution (
275257 str (inserted_oid ),
276- ExecutionUpdate (status = ExecutionStatus .ERROR ,
277- errors = "Script execution failed" ). model_dump ( exclude_unset = True )
258+ ExecutionUpdate (status = ExecutionStatus .ERROR , errors = "Script execution failed" ). model_dump (
259+ exclude_unset = True )
278260 )
279- raise IntegrationException (status_code = 500 ,
280- detail = "Script execution failed" ) from e
261+ raise IntegrationException (status_code = 500 , detail = "Script execution failed" ) from e
281262 finally :
282- EXECUTION_DURATION . labels ( lang_and_version = lang + "-" + lang_version ). observe ( time () - start_time )
263+ # These metrics track the overall API call, not just script runtime
283264 ACTIVE_EXECUTIONS .dec ()
265+ # QUEUE_DEPTH is decremented here to ensure it's always called once,
266+ # avoiding the previous double-decrement bug.
267+ QUEUE_DEPTH .dec ()
284268
285269 async def get_execution_result (self , execution_id : str ) -> ExecutionInDB :
286270 execution = await self .execution_repo .get_execution (execution_id )
0 commit comments