11import os
22import datetime
3+ import subprocess
4+ import warnings
35import numpy as np
46from scipy .io import savemat
7+ from dataclasses import is_dataclass
8+ import acsl_pychrono .config .config as Cfg
59
610class Logging :
711 @staticmethod
8- def saveMatlabWorkspaceLog (log_dict , gains , controller_type ):
9- """
10- Save the log and gains data into a .mat file in a structured directory.
11-
12- Args:
13- log_dict (dict): Dictionary containing the log data.
14- gains (object): Object containing gain parameters as attributes.
15- controller_type (str): The name/type of the controller.
16- """
12+ def getOutputDir (sim_cfg : Cfg .SimulationConfig ) -> str :
13+ controller_type = Cfg .MissionConfig .controller_type
14+ wrapper_flag = Cfg .MissionConfig .wrapper_flag
1715
1816 # Get current time
1917 now = datetime .datetime .now ()
20- timestamp = now .strftime ("%Y%m%d_%H%M%S" )
2118 year = now .strftime ("%Y" )
2219 month = now .strftime ("%m" )
2320 full_date = now .strftime ("%Y%m%d" )
2421
2522 # Construct the directory path
26- dir_path = os .path .join ("logs" , year , month , full_date , controller_type , "workspaces" )
23+ if wrapper_flag :
24+ dir_path = os .path .join (sim_cfg .mission_config .wrapper_batch_dir )
25+ else :
26+ dir_path = os .path .join ("logs" , year , month , full_date , controller_type , "workspaces" )
2727 os .makedirs (dir_path , exist_ok = True ) # Create all directories if not present
2828
29+ return dir_path
30+
31+ @staticmethod
32+ def generateUniqueFilename (base_name : str , extension : str , dir_path : str , use_suffix : bool ) -> str :
33+ if not use_suffix :
34+ # No suffix, allow overwrite
35+ return os .path .join (dir_path , f"{ base_name } .{ extension } " )
36+
37+ # Suffix to avoid overwrite
38+ run_id = 1
39+ while True :
40+ filename = f"{ base_name } -{ run_id } .{ extension } "
41+ full_path = os .path .join (dir_path , filename )
42+ if not os .path .exists (full_path ):
43+ return full_path
44+ run_id += 1
45+
46+ @staticmethod
47+ def extractGainsDict (gains ) -> dict :
2948 # Create a dictionary from instance variables
3049 gains_dict = {
3150 key : value for key , value in gains .__dict__ .items ()
@@ -40,16 +59,102 @@ def saveMatlabWorkspaceLog(log_dict, gains, controller_type):
4059 for key in gains_dict_shortened :
4160 if isinstance (gains_dict_shortened [key ], np .matrix ):
4261 gains_dict_shortened [key ] = np .array (gains_dict_shortened [key ])
62+
63+ return gains_dict_shortened
64+
65+ @staticmethod
66+ def dataclassToDict (obj , truncate_keys : bool = True ):
67+ """
68+ Recursively convert a dataclass to a nested dictionary,
69+ handling nested dataclasses, NumPy arrays/matrices,
70+ truncating field names to 31 characters if specified,
71+ and filtering out unsupported types.
72+ """
73+ if is_dataclass (obj ):
74+ result = {}
75+ for key , value in obj .__dict__ .items ():
76+ # Filter to only include serializable fields
77+ if isinstance (value , (int , float , str , bool , np .ndarray , np .matrix )) or is_dataclass (value ):
78+ converted_value = Logging .dataclassToDict (value , truncate_keys = truncate_keys )
79+ key = key [:31 ] if truncate_keys else key
80+ if isinstance (converted_value , np .matrix ):
81+ converted_value = np .array (converted_value )
82+ result [key ] = converted_value
83+ return result
84+ elif isinstance (obj , dict ):
85+ return {k [:31 ] if truncate_keys else k : Logging .dataclassToDict (v , truncate_keys = truncate_keys ) for k , v in obj .items ()}
86+ elif isinstance (obj , (list , tuple )):
87+ return [Logging .dataclassToDict (v , truncate_keys = truncate_keys ) for v in obj ]
88+ elif isinstance (obj , np .matrix ):
89+ return np .array (obj )
90+ else :
91+ return obj
92+
93+ @staticmethod
94+ def getGitRepoInfo () -> dict :
95+ def run_git_cmd (args ):
96+ try :
97+ return subprocess .check_output (['git' ] + args , cwd = repo_dir , stderr = subprocess .DEVNULL ).decode ().strip ()
98+ except subprocess .CalledProcessError :
99+ return ""
100+ except FileNotFoundError :
101+ return ""
102+
103+ def get_github_url (repo_url : str , commit_hash : str ) -> str :
104+ if repo_url .endswith (".git" ):
105+ repo_url = repo_url [:- 4 ]
106+ return f"{ repo_url } /tree/{ commit_hash } "
107+
108+ repo_dir = os .getcwd ()
109+
110+ if not os .path .exists (os .path .join (repo_dir , '.git' )):
111+ warnings .warn ("⚠️ Git repository info cannot be tracked. No .git directory found." )
112+ return {
113+ "repo_path" : "" ,
114+ "remote_url" : "" ,
115+ "commit_hash" : "" ,
116+ "commit_tag" : "" ,
117+ "branch" : "" ,
118+ "dirty" : 0 ,
119+ "commit_url" : ""
120+ }
121+
122+ remote_url = run_git_cmd (['remote' , 'get-url' , 'origin' ]) or ""
123+ commit_hash = run_git_cmd (['rev-parse' , 'HEAD' ]) or ""
124+
125+ git_info = {
126+ "repo_path" : os .path .abspath (repo_dir ),
127+ "remote_url" : remote_url ,
128+ "commit_hash" : commit_hash ,
129+ "commit_tag" : run_git_cmd (['describe' , '--tags' , '--exact-match' ]) or "" ,
130+ "branch" : run_git_cmd (['rev-parse' , '--abbrev-ref' , 'HEAD' ]) or "" ,
131+ "dirty" : 1 if run_git_cmd (['status' , '--porcelain' ]) else 0 ,
132+ "commit_url" : get_github_url (remote_url , commit_hash ) or ""
133+ }
134+
135+ return git_info
136+
137+ @staticmethod
138+ def saveMatlabWorkspaceLog (log_dict , gains , sim_cfg : Cfg .SimulationConfig , git_info : dict | None = None ):
139+ timestamp = datetime .datetime .now ().strftime ("%Y%m%d_%H%M%S" )
140+ dir_path = Logging .getOutputDir (sim_cfg )
141+ os .makedirs (dir_path , exist_ok = True )
142+
143+ base_filename = f"workspace_log_{ timestamp } "
144+ full_path_log = Logging .generateUniqueFilename (
145+ base_filename ,
146+ "mat" ,
147+ dir_path ,
148+ Cfg .MissionConfig .wrapper_flag
149+ )
43150
44- # Nest gains inside the log structure
45151 mat_dict = {
46- "log" : log_dict , # Log data
47- "gains" : gains_dict_shortened # Nested gains struct
152+ "log" : log_dict ,
153+ "gains" : Logging .extractGainsDict (gains ),
154+ "sim_cfg" : Logging .dataclassToDict (sim_cfg )
48155 }
49156
50- # Construct the full file path for the log file
51- filename_log = f"workspace_log_{ timestamp } .mat"
52- full_path_log = os .path .join (dir_path , filename_log )
157+ if git_info is not None :
158+ mat_dict ["git_info" ] = git_info
53159
54- # Save the .mat file with the nested gains structure
55160 savemat (full_path_log , mat_dict )
0 commit comments