55import asyncio
66import os
77from dataclasses import dataclass
8+ from datetime import datetime , timezone
89from pathlib import Path
9- from typing import Optional , Dict
10+ from typing import Optional , Dict , List
1011
1112import structlog
1213
@@ -211,18 +212,19 @@ async def cleanup_job_cgroup(self, job_id: int) -> None:
211212 job_id = job_id ,
212213 error = str (exc ))
213214
214- async def update_metrics (self , job_id : int , executor_name : str ) -> None :
215+ async def update_metrics (self , job_id : int , executor_name : str ) -> Optional [ ResourceUsage ] :
215216 """Update Prometheus metrics for a job."""
216217 usage = await self .get_job_usage (job_id )
217218 if not usage :
218- return
219-
219+ return None
220+
220221 labels = [str (job_id ), executor_name ]
221222 self ._cpu_usage_gauge .set (usage .cpu_seconds , labels = labels )
222223 self ._memory_usage_gauge .set (usage .memory_bytes , labels = labels )
223-
224+
224225 # Counters need to track deltas, but for simplicity we'll just use current values
225226 # In production, we'd track previous values and report deltas
227+ return usage
226228
227229
228230class ResourceTracker :
@@ -232,6 +234,7 @@ def __init__(self) -> None:
232234 self ._cgroup_manager = CGroupManager ()
233235 self ._tracking_tasks : Dict [int , asyncio .Task ] = {}
234236 self ._running = False
237+ self ._usage_history : Dict [int , List [dict [str , float | str ]]] = {}
235238
236239 async def start (self ) -> None :
237240 """Start the resource tracking system."""
@@ -293,9 +296,10 @@ async def stop_job_tracking(self, job_id: int) -> None:
293296 await task
294297 except asyncio .CancelledError :
295298 pass
296-
299+
297300 # Clean up cgroup
298301 await self ._cgroup_manager .cleanup_job_cgroup (job_id )
302+ self ._usage_history .pop (job_id , None )
299303
300304 async def add_process (self , job_id : int , pid : int ) -> None :
301305 """Add a process to job tracking."""
@@ -304,15 +308,29 @@ async def add_process(self, job_id: int, pid: int) -> None:
304308 async def get_usage (self , job_id : int ) -> Optional [ResourceUsage ]:
305309 """Get resource usage for a job."""
306310 return await self ._cgroup_manager .get_job_usage (job_id )
311+
312+ def get_usage_history (self , job_id : int ) -> List [dict [str , float | str ]]:
313+ return list (self ._usage_history .get (job_id , []))
307314
308315 async def _track_job_metrics (self , job_id : int , executor_name : str ) -> None :
309316 """Periodically update metrics for a job."""
310317 while self ._running :
311318 try :
312- await self ._cgroup_manager .update_metrics (job_id , executor_name )
319+ usage = await self ._cgroup_manager .update_metrics (job_id , executor_name )
320+ if usage :
321+ history = self ._usage_history .setdefault (job_id , [])
322+ history .append (
323+ {
324+ "ts" : datetime .now (timezone .utc ).isoformat (),
325+ "cpu_seconds" : usage .cpu_seconds ,
326+ "memory_bytes" : usage .memory_bytes ,
327+ }
328+ )
329+ if len (history ) > 120 :
330+ history .pop (0 )
313331 except Exception as exc :
314- LOGGER .warning ("Metrics update failed" ,
332+ LOGGER .warning ("Metrics update failed" ,
315333 job_id = job_id ,
316334 error = str (exc ))
317-
335+
318336 await asyncio .sleep (5 ) # Update every 5 seconds
0 commit comments