1919
2020from __future__ import annotations
2121
22+ import os
23+ import sys
24+
25+
26+ # adding sys.path makes the monitor script able to import path tools.stats.utilization_stats_lib
27+ sys .path .insert (0 , os .path .join (os .path .dirname (__file__ ), ".." , ".." ))
2228import argparse
2329import copy
2430import dataclasses
2531import datetime
26- import json
32+ import os
2733import signal
2834import threading
2935import time
3238
3339import psutil # type: ignore[import]
3440
41+ from tools .stats .utilization_stats_lib import (
42+ getDataModelVersion ,
43+ GpuUsage ,
44+ RecordData ,
45+ UtilizationMetadata ,
46+ UtilizationRecord ,
47+ UtilizationStats ,
48+ )
49+
3550
3651_HAS_PYNVML = False
3752_HAS_AMDSMI = False
3853
39- _DATA_MODEL_VERSION = 1.0
54+ _job_name = os .environ .get ("JOB_NAME" , "" )
55+ _job_id = os .environ .get ("JOB_ID" , "" )
56+ _workflow_run_id = os .environ .get ("WORKFLOW_RUN_ID" , "" )
57+ _workflow_name = os .environ .get ("WORKFLOW_NAME" , "" )
4058
4159
4260@dataclasses .dataclass
@@ -164,16 +182,20 @@ def __init__(
164182 in a pretty format with more information.
165183 """
166184 self ._log_interval = log_interval
167- self ._summary_info = {
168- "level" : "metadata" ,
169- "interval" : self ._log_interval ,
170- "data_model_version" : _DATA_MODEL_VERSION ,
171- }
185+ self ._metadata = UtilizationMetadata (
186+ level = "metadata" ,
187+ usage_collect_interval = self ._log_interval ,
188+ data_model_version = getDataModelVersion (),
189+ job_id = _job_id ,
190+ job_name = _job_name ,
191+ workflow_id = _workflow_run_id ,
192+ workflow_name = _workflow_name ,
193+ )
172194 self ._data_collect_interval = data_collect_interval
173195 self ._has_pynvml = pynvml_enabled
174196 self ._has_amdsmi = amdsmi_enabled
175197 self ._gpu_handles : list [Any ] = []
176- self ._gpu_libs_detected : list [ str ] = []
198+ self ._gpu_lib_detected : str = ""
177199 self ._num_of_cpus = 0
178200 self ._debug_mode = is_debug_mode
179201 self ._initial_gpu_handler ()
@@ -210,31 +232,32 @@ def _collect_data(self) -> None:
210232 finally :
211233 time .sleep (self ._data_collect_interval )
212234
213- def _generate_stats (self , data_list : list [float ]) -> dict [ str , Any ] :
235+ def _generate_stats (self , data_list : list [float ]) -> UtilizationStats :
214236 """
215237 Generate stats from the data list.
216238 """
217239 if len (data_list ) == 0 :
218- return {}
240+ return UtilizationStats ()
219241
220242 total = sum (data_list )
221243 avg = total / len (data_list )
222244 maxi = max (data_list )
223- return {
224- "avg" : round (avg , 2 ),
225- "max" : round (maxi , 2 ),
226- }
245+
246+ return UtilizationStats (
247+ avg = round (avg , 2 ),
248+ max = round (maxi , 2 ),
249+ )
227250
228251 def _output_data (self ) -> None :
229252 """
230253 output the data.
231254 """
232- self ._summary_info [ "start_time" ] = datetime .datetime .now ().timestamp ()
233- self .log_json (self ._summary_info )
255+ self ._metadata . start_at = datetime .datetime .now ().timestamp ()
256+ self .log_json (self ._metadata . to_json () )
234257
235258 while not self .exit_event .is_set ():
236259 collecting_start_time = time .time ()
237- stats = {}
260+ stats = UtilizationRecord ()
238261 try :
239262 data_list , error_list = self .shared_resource .get_and_reset ()
240263 if self ._debug_mode :
@@ -252,13 +275,8 @@ def _output_data(self) -> None:
252275 if not data_list :
253276 # pass since no data is collected
254277 continue
255-
256- stats .update (
257- {
258- "level" : "record" ,
259- "time" : datetime .datetime .now ().timestamp (),
260- }
261- )
278+ stats .level = "record"
279+ stats .timestamp = datetime .datetime .now ().timestamp ()
262280
263281 cpu_stats = self ._generate_stats (
264282 [data .cpu_percent for data in data_list ]
@@ -271,43 +289,35 @@ def _output_data(self) -> None:
271289 cmds = {
272290 process ["cmd" ] for data in data_list for process in data .processes
273291 }
274- stats .update (
275- {
276- "cpu" : cpu_stats ,
277- "memory" : memory_stats ,
278- "cmds" : list (cmds ),
279- "count" : len (data_list ),
280- }
281- )
292+
293+ stats .cmd_names = list (cmds )
294+ record = RecordData ()
295+ record .cpu = cpu_stats
296+ record .memory = memory_stats
282297
283298 # collect gpu metrics
284299 if self ._has_pynvml or self ._has_amdsmi :
285300 gpu_list = self ._calculate_gpu_utilization (data_list )
286- stats .update (
287- {
288- "gpu_list" : gpu_list ,
289- }
290- )
301+ record .gpu_usage = gpu_list
302+ stats .data = record
291303 except Exception as e :
292- stats = {
293- " level" : "record" ,
294- "time" : datetime .datetime .now ().timestamp (),
295- " error" : str (e ),
296- }
304+ stats = UtilizationRecord (
305+ level = "record" ,
306+ timestamp = datetime .datetime .now ().timestamp (),
307+ error = str (e ),
308+ )
297309 finally :
298310 collecting_end_time = time .time ()
299311 time_diff = collecting_end_time - collecting_start_time
300312 # verify there is data
301- if stats :
302- stats [ " log_duration" ] = f"{ time_diff * 1000 :.2f} ms"
303- self .log_json (stats )
313+ if stats . level :
314+ stats . log_duration = f"{ time_diff * 1000 :.2f} ms"
315+ self .log_json (stats . to_json () )
304316 time .sleep (self ._log_interval )
305317 # shut down gpu connections when exiting
306318 self ._shutdown_gpu_connections ()
307319
308- def _calculate_gpu_utilization (
309- self , data_list : list [UsageData ]
310- ) -> list [dict [str , Any ]]:
320+ def _calculate_gpu_utilization (self , data_list : list [UsageData ]) -> list [GpuUsage ]:
311321 """
312322 Calculates the GPU utilization.
313323 """
@@ -324,11 +334,11 @@ def _calculate_gpu_utilization(
324334 gpu_util_stats = self ._generate_stats (gpu_utilization [gpu_uuid ])
325335 gpu_mem_util_stats = self ._generate_stats (gpu_mem_utilization [gpu_uuid ])
326336 calculate_gpu .append (
327- {
328- " uuid" : gpu_uuid ,
329- " util_percent" : gpu_util_stats ,
330- " mem_util_percent" : gpu_mem_util_stats ,
331- }
337+ GpuUsage (
338+ uuid = gpu_uuid ,
339+ util_percent = gpu_util_stats ,
340+ mem_util_percent = gpu_mem_util_stats ,
341+ )
332342 )
333343 return calculate_gpu
334344
@@ -348,10 +358,7 @@ def log_json(self, stats: Any) -> None:
348358 """
349359 Logs the stats in json format to stdout.
350360 """
351- if self ._debug_mode :
352- print (json .dumps (stats , indent = 4 ))
353- return
354- print (json .dumps (stats ))
361+ print (stats )
355362
356363 def _collect_gpu_data (self ) -> list [GpuData ]:
357364 gpu_data_list = []
@@ -391,28 +398,25 @@ def _initial_gpu_handler(self) -> None:
391398 """
392399 try :
393400 if self ._has_pynvml :
394- self ._gpu_libs_detected . append ( "pynvml" )
401+ self ._gpu_lib_detected = "pynvml"
395402 # Todo: investigate if we can use device uuid instead of index.
396403 # there is chance that the gpu index can change when the gpu is rebooted.
397404 self ._gpu_handles = [
398405 pynvml .nvmlDeviceGetHandleByIndex (i )
399406 for i in range (pynvml .nvmlDeviceGetCount ())
400407 ]
401408 if self ._has_amdsmi :
402- self ._gpu_libs_detected . append ( "amdsmi" )
409+ self ._gpu_lib_detected = "amdsmi"
403410 self ._gpu_handles = amdsmi .amdsmi_get_processor_handles ()
404411
405412 self ._num_of_cpus = psutil .cpu_count (logical = False )
406413 # update summary info
407- self ._summary_info .update (
408- {
409- "gpu_libs_detected" : self ._gpu_libs_detected ,
410- "num_of_gpus" : len (self ._gpu_handles ),
411- "num_of_cpus" : self ._num_of_cpus ,
412- }
413- )
414+ self ._metadata .gpu_type = self ._gpu_lib_detected
415+ self ._metadata .gpu_count = len (self ._gpu_handles )
416+ self ._metadata .cpu_count = self ._num_of_cpus
417+
414418 except Exception as e :
415- self ._summary_info [ " error" ] = str (e )
419+ self ._metadata . error = str (e )
416420
417421 def _shutdown_gpu_connections (self ) -> None :
418422 if self ._has_amdsmi :
0 commit comments