33import json
44import re
55from pathlib import Path
6- from typing import Any , Dict , List , Literal , Optional , Set , cast
6+ from typing import Any , Dict , List , Literal , Optional , cast
77
88import pandas as pd
99
@@ -46,6 +46,10 @@ def __init__(self, config: AnalysisConfig):
4646 self ._setup_output_directories ()
4747 self ._validate_and_log_structure ()
4848
49+ # Cache for tensor parallel size (parsed once per benchmark run)
50+ self ._tensor_parallel_cache : Optional [int ] = None
51+ self ._benchmark_run_log_path : Optional [Path ] = None
52+
4953 logger .info ("Analyzer initialized with custom filename formats" )
5054 logger .info (f"Loaded { len (self .filename_formats )} filename formats" )
5155 logger .info (f"Input directory: { self .config .input_dir } " )
@@ -54,6 +58,30 @@ def __init__(self, config: AnalysisConfig):
5458 logger .info (f"Monitoring directory: { self .monitoring_dir } " )
5559 logger .info (f"Output directory: { self .config .output_dir } " )
5660
61+ def _get_tensor_parallel_size (self , benchmark_run_log : Optional [Path ]) -> Optional [int ]:
62+ """Get tensor parallel size with caching optimization."""
63+ # If already parsed this log file, return cached value
64+ if (
65+ self ._benchmark_run_log_path == benchmark_run_log
66+ and self ._tensor_parallel_cache is not None
67+ ):
68+ return self ._tensor_parallel_cache
69+
70+ # Parse tensor parallel size from log (only once per benchmark run)
71+ if benchmark_run_log and benchmark_run_log .exists ():
72+ tensor_parallel_size = self ._extract_tensor_parallel_from_log (benchmark_run_log )
73+
74+ # Cache the result
75+ self ._tensor_parallel_cache = tensor_parallel_size
76+ self ._benchmark_run_log_path = benchmark_run_log
77+
78+ logger .info (
79+ f"Cached tensor_parallel_size={ tensor_parallel_size } from { benchmark_run_log .name } "
80+ )
81+ return tensor_parallel_size
82+
83+ return None
84+
5785 def _get_results_directory (self ) -> Path :
5886 """Get the directory containing JSON result files"""
5987 if self .config .results_subdir :
@@ -116,10 +144,11 @@ def discover_experiment_files(self) -> List[ExperimentFiles]:
116144 structures and provides filtering options based on completeness requirements.
117145
118146 The discovery process follows these steps:
119- 1. Locate all JSON result files matching the configured pattern
120- 2. For each result file, search for corresponding monitoring files
121- 3. Build ExperimentFiles objects containing all related file paths
122- 4. Apply filtering based on completeness requirements if specified
147+ 1. Try to discover the master benchmark log run
148+ 2. Locate all JSON result files matching the configured pattern
149+ 3. For each result file, search for corresponding monitoring files
150+ 4. Build ExperimentFiles objects containing all related file paths
151+ 5. Apply filtering based on completeness requirements if specified
123152
124153 Returns:
125154 List[ExperimentFiles]: A list of ExperimentFiles objects, each containing:
@@ -151,13 +180,16 @@ def discover_experiment_files(self) -> List[ExperimentFiles]:
151180 """
152181 logger .info ("Discovering experiment files..." )
153182
183+ # First, find the master benchmark run log
184+ benchmark_run_log = self ._find_benchmark_run_log ()
185+
154186 # Get all JSON result files
155187 result_files = list (self .results_dir .glob (self .config .results_pattern ))
156188 experiments = []
157189
158190 for result_file in result_files :
159191 try :
160- experiment = self ._build_experiment_files (result_file )
192+ experiment = self ._build_experiment_files (result_file , benchmark_run_log )
161193 experiments .append (experiment )
162194
163195 # Log what we found for this experiment
@@ -178,7 +210,26 @@ def discover_experiment_files(self) -> List[ExperimentFiles]:
178210 logger .info (f"Discovered { len (experiments )} experiment file sets" )
179211 return experiments
180212
181- def _build_experiment_files (self , result_file : Path ) -> ExperimentFiles :
213+ def _find_benchmark_run_log (self ) -> Optional [Path ]:
214+ """Find the master benchmark run log file."""
215+ if not self .logs_dir or not self .logs_dir .exists ():
216+ return None
217+
218+ # Look for files matching pattern: benchmark_run_*.log
219+ benchmark_logs = list (self .logs_dir .glob ("benchmark_run_*.log" ))
220+
221+ if benchmark_logs :
222+ # If multiple logs exist, take the most recent one
223+ latest_log = max (benchmark_logs , key = lambda x : x .stat ().st_mtime )
224+ logger .info (f"Found master benchmark log: { latest_log .name } " )
225+ return latest_log
226+
227+ logger .warning ("No master benchmark run log found" )
228+ return None
229+
230+ def _build_experiment_files (
231+ self , result_file : Path , benchmark_run_log : Optional [Path ]
232+ ) -> ExperimentFiles :
182233 """Build ExperimentFiles object by finding matching files."""
183234 # Extract the base pattern from the result filename
184235 base_name = result_file .stem # Remove .json extension
@@ -212,6 +263,7 @@ def _build_experiment_files(self, result_file: Path) -> ExperimentFiles:
212263
213264 return ExperimentFiles (
214265 result_file = result_file ,
266+ benchmark_run_log = benchmark_run_log , # Shared across all experiments
215267 log_file = log_file ,
216268 cpu_metrics_file = cpu_metrics_file ,
217269 gpu_power_file = gpu_power_file ,
@@ -562,6 +614,13 @@ def _process_monitoring_data(self) -> Dict[str, pd.DataFrame]:
562614 thermal_analysis = []
563615 power_analysis = []
564616
617+ # Get tensor parallel size once for the entire benchmark run
618+ benchmark_run_log = None
619+ if self .experiment_files :
620+ benchmark_run_log = self .experiment_files [0 ].benchmark_run_log
621+
622+ tensor_parallel_size = self ._get_tensor_parallel_size (benchmark_run_log )
623+
565624 for experiment in self .experiment_files :
566625 try :
567626 monitoring_data = self .load_monitoring_data (experiment )
@@ -581,23 +640,91 @@ def _process_monitoring_data(self) -> Dict[str, pd.DataFrame]:
581640 # Enhanced power analysis
582641 if "gpu_power" in monitoring_data :
583642 power_data = self ._analyze_power_efficiency (
584- experiment , monitoring_data ["gpu_power" ]
643+ experiment = experiment ,
644+ power_df = monitoring_data ["gpu_power" ],
645+ tensor_parallel_size = tensor_parallel_size ,
585646 )
586647 power_analysis .append (power_data )
587648
588649 except Exception as e :
589650 logger .error (f"Error processing monitoring for { experiment .result_file .name } : { e } " )
590651
652+ gpu_allocation_summary = self ._analyze_gpu_device_allocation (power_analysis )
653+
591654 # Export comprehensive monitoring analysis
592655 return self ._export_monitoring_analysis (
593- monitoring_summaries , thermal_analysis , power_analysis
656+ monitoring_summaries , thermal_analysis , power_analysis , gpu_allocation_summary
594657 )
595658
659+ @staticmethod
660+ def _analyze_gpu_device_allocation (power_analysis : List [Dict [str , Any ]]) -> Dict [str , Any ]:
661+ """Analyze GPU device allocation patterns across experiments."""
662+ if not power_analysis :
663+ return {}
664+
665+ # Get unique devices across all experiments
666+ all_allocated_devices = []
667+ device_usage_count : Dict [str , int ] = {}
668+
669+ for experiment_data in power_analysis :
670+ if (
671+ "allocated_gpu_devices" in experiment_data
672+ and experiment_data ["allocated_gpu_devices" ]
673+ ):
674+ devices = experiment_data ["allocated_gpu_devices" ]
675+ all_allocated_devices .extend (devices )
676+
677+ # Count usage per device
678+ for device in devices :
679+ device_usage_count [device ] = device_usage_count .get (device , 0 ) + 1
680+
681+ unique_devices = list (set (all_allocated_devices ))
682+
683+ allocation_summary = {
684+ "unique_allocated_devices" : unique_devices ,
685+ "total_unique_devices" : len (unique_devices ),
686+ "device_usage_frequency" : device_usage_count ,
687+ "most_used_device" : (
688+ max (device_usage_count .items (), key = lambda x : x [1 ]) if device_usage_count else None
689+ ),
690+ }
691+
692+ logger .info (f"GPU allocation analysis: { allocation_summary } " )
693+ return allocation_summary
694+
695+ @staticmethod
696+ def _extract_tensor_parallel_from_log (benchmark_run_log : Optional [Path ]) -> Optional [int ]:
697+ """Extract tensor parallel size from master benchmark run log for specific experiment."""
698+ if not benchmark_run_log or not benchmark_run_log .exists ():
699+ return None
700+
701+ try :
702+ with open (benchmark_run_log , "r" , encoding = "utf-8" ) as f :
703+ # Just find the first occurrence of "Tensor Parallel:"
704+ # since it's consistent across all experiments
705+ for line in f :
706+ if "Tensor Parallel:" in line :
707+ match = re .search (r"Tensor Parallel:\s*(\d+)" , line )
708+ if match :
709+ tensor_parallel = int (match .group (1 ))
710+ logger .debug (
711+ f"Found tensor_parallel_size={ tensor_parallel } (consistent across all experiments)"
712+ )
713+ return tensor_parallel
714+
715+ logger .warning ("No 'Tensor Parallel:' line found in benchmark log" )
716+ return None
717+
718+ except Exception as e :
719+ logger .warning (f"Failed to parse tensor parallel size from { benchmark_run_log } : { e } " )
720+ return None
721+
596722 def _export_monitoring_analysis (
597723 self ,
598724 monitoring_summaries : List [Dict [str , Any ]],
599725 thermal_analysis : List [Dict [str , Any ]],
600726 power_analysis : List [Dict [str , Any ]],
727+ gpu_allocation_summary : Dict [str , Any ],
601728 ) -> Dict [str , pd .DataFrame ]:
602729 """Export comprehensive monitoring analysis to files."""
603730
@@ -623,10 +750,21 @@ def _export_monitoring_analysis(
623750 power_df .to_csv (power_file , index = False )
624751 logger .info (f"Power analysis exported to { power_file } " )
625752
753+ # Export GPU allocation summary
754+ if gpu_allocation_summary :
755+ allocation_file = self .config .output_dir / "tables" / "gpu_allocation_summary.csv"
756+ # Convert summary to DataFrame for consistent export
757+ allocation_df = pd .DataFrame ([gpu_allocation_summary ])
758+ allocation_df .to_csv (allocation_file , index = False )
759+ logger .info (f"GPU allocation summary exported to { allocation_file } " )
760+
626761 return {
627762 "monitoring_summary" : monitoring_df , # Aggregated monitoring metrics
628763 "thermal_analysis" : thermal_df , # Temperature analysis data
629764 "power_analysis" : power_df , # Power consumption analysis
765+ "gpu_allocation_summary" : (
766+ pd .DataFrame ([gpu_allocation_summary ]) if gpu_allocation_summary else pd .DataFrame ()
767+ ),
630768 }
631769
632770 @staticmethod
@@ -647,7 +785,9 @@ def _analyze_thermal_performance(
647785
648786 @staticmethod
649787 def _analyze_power_efficiency (
650- experiment : ExperimentFiles , power_df : pd .DataFrame , active_gpus : Optional [Set [str ]] = None
788+ experiment : ExperimentFiles ,
789+ power_df : pd .DataFrame ,
790+ tensor_parallel_size : Optional [int ] = None ,
651791 ) -> Dict [str , Any ]:
652792 """Analyze power consumption efficiency."""
653793 total_power_series = power_df .groupby ("timestamp" )["power_watts" ].sum ()
@@ -662,25 +802,48 @@ def _analyze_power_efficiency(
662802 ), # Per GPU average
663803 "power_stability" : total_power_series .std (),
664804 "num_gpus_monitored" : num_gpus_monitored ,
805+ "tensor_parallel_size" : tensor_parallel_size or 1 , # Store for reference
665806 }
666807
667- # If we know which GPUs were active, calculate active-only metrics
668- if active_gpus :
669- active_power_series = (
670- power_df [power_df ["device" ].isin (active_gpus )]
671- .groupby ("timestamp" )["power_watts" ]
672- .sum ()
673- )
674- results .update (
675- {
676- "avg_active_power" : active_power_series .mean (),
677- "power_efficiency_active" : (
678- active_power_series .mean () / len (active_gpus ) if active_gpus else 0.0
679- ),
680- "num_active_gpus" : len (active_gpus ),
681- }
808+ # If we know tensor parallel size, calculate allocated GPU metrics
809+ if tensor_parallel_size and tensor_parallel_size > 0 :
810+ # Approach: Identify most active GPUs based on power consumption
811+ gpu_avg_power = (
812+ power_df .groupby ("device" )["power_watts" ].mean ().sort_values (ascending = False )
682813 )
683814
815+ if len (gpu_avg_power ) >= tensor_parallel_size :
816+ # Take the top N GPUs by power consumption (most likely the allocated ones)
817+ allocated_gpus = gpu_avg_power .head (tensor_parallel_size )
818+ allocated_devices = allocated_gpus .index .tolist ()
819+
820+ # Calculate power metrics for allocated GPUs only
821+ allocated_power_df = power_df [power_df ["device" ].isin (allocated_devices )]
822+ allocated_power_series = allocated_power_df .groupby ("timestamp" )[
823+ "power_watts"
824+ ].sum ()
825+
826+ results .update (
827+ {
828+ "avg_allocated_power" : allocated_power_series .mean (),
829+ "power_efficiency_allocated" : allocated_power_series .mean ()
830+ / tensor_parallel_size ,
831+ "num_active_gpus" : tensor_parallel_size ,
832+ "allocated_gpu_devices" : allocated_devices ,
833+ }
834+ )
835+ else :
836+ # Fallback: use all available GPUs
837+ results .update (
838+ {
839+ "avg_allocated_power" : total_power_series .mean (),
840+ "power_efficiency_allocated" : total_power_series .mean ()
841+ / len (gpu_avg_power ),
842+ "num_active_gpus" : len (gpu_avg_power ),
843+ "allocated_gpu_devices" : gpu_avg_power .index .tolist (),
844+ }
845+ )
846+
684847 return results
685848
686849 @staticmethod
0 commit comments