@@ -36,6 +36,73 @@ def __init__(self):
3636
3737 self .tick_size = 12
3838
39+ self .pkl_files_info = {}
40+
41+ def get_file_stat (self , file_path ):
42+ try :
43+ stat = os .stat (file_path )
44+ return {
45+ 'mtime' : stat .st_mtime ,
46+ 'size' : stat .st_size
47+ }
48+ except Exception as e :
49+ print (f"Error getting file info for { file_path } : { e } " )
50+ return None
51+
52+ def check_pkl_files_changed (self ):
53+ if not self .runtime_template or not self .data_parser :
54+ return False
55+
56+ pkl_dir = self .data_parser .pkl_files_dir
57+ pkl_files = ['workers.pkl' , 'files.pkl' , 'tasks.pkl' , 'manager.pkl' , 'subgraphs.pkl' ]
58+
59+ for pkl_file in pkl_files :
60+ file_path = os .path .join (pkl_dir , pkl_file )
61+ current_stat = self .get_file_stat (file_path )
62+
63+ if not current_stat :
64+ continue
65+
66+ if (file_path not in self .pkl_files_info or
67+ current_stat ['mtime' ] != self .pkl_files_info [file_path ]['mtime' ] or
68+ current_stat ['size' ] != self .pkl_files_info [file_path ]['size' ]):
69+ print (f"Detected changes in { pkl_file } " )
70+ return True
71+
72+ return False
73+
74+ def update_pkl_files_info (self ):
75+ if not self .runtime_template or not self .data_parser :
76+ return
77+
78+ pkl_dir = self .data_parser .pkl_files_dir
79+ pkl_files = ['workers.pkl' , 'files.pkl' , 'tasks.pkl' , 'manager.pkl' , 'subgraphs.pkl' ]
80+
81+ for pkl_file in pkl_files :
82+ file_path = os .path .join (pkl_dir , pkl_file )
83+ info = self .get_file_stat (file_path )
84+ if info :
85+ self .pkl_files_info [file_path ] = info
86+
87+ def reload_data (self ):
88+ try :
89+ print ("Reloading data from checkpoint..." )
90+ self .data_parser .restore_from_checkpoint ()
91+ self .manager = self .data_parser .manager
92+ self .workers = self .data_parser .workers
93+ self .files = self .data_parser .files
94+ self .tasks = self .data_parser .tasks
95+ self .subgraphs = self .data_parser .subgraphs
96+
97+ self .MIN_TIME = self .manager .when_first_task_start_commit
98+ self .MAX_TIME = self .manager .time_end
99+
100+ self .update_pkl_files_info ()
101+ print ("Data reload completed successfully" )
102+ except Exception as e :
103+ print (f"Error reloading data: { e } " )
104+ traceback .print_exc ()
105+
39106 def change_runtime_template (self , runtime_template ):
40107 if not runtime_template :
41108 return
@@ -58,6 +125,17 @@ def change_runtime_template(self, runtime_template):
58125 self .MIN_TIME = self .manager .when_first_task_start_commit
59126 self .MAX_TIME = self .manager .time_end
60127
128+ self .update_pkl_files_info ()
129+
130+ def check_and_reload_data ():
131+ def decorator (func ):
132+ def wrapper (* args , ** kwargs ):
133+ if template_manager .check_pkl_files_changed ():
134+ template_manager .reload_data ()
135+ return func (* args , ** kwargs )
136+ return wrapper
137+ return decorator
138+
61139def all_subfolders_exists (parent : str , folder_names : list [str ]) -> bool :
62140 parent_path = Path (parent ).resolve ()
63141 for folder_name in folder_names :
@@ -70,6 +148,7 @@ def all_subfolders_exists(parent: str, folder_names: list[str]) -> bool:
70148app = Flask (__name__ )
71149
72150@app .route ('/api/execution-details' )
151+ @check_and_reload_data ()
73152def get_execution_details ():
74153 try :
75154 data : Dict [str , Any ] = {}
@@ -244,6 +323,7 @@ def downsample_storage_data(points):
244323 return result
245324
246325@app .route ('/api/storage-consumption' )
326+ @check_and_reload_data ()
247327def get_storage_consumption ():
248328 try :
249329 show_percentage = request .args .get ('show_percentage' , 'false' ).lower () == 'true'
@@ -432,6 +512,7 @@ def downsample_worker_transfers(points):
432512 return result
433513
434514@app .route ('/api/worker-transfers' )
515+ @check_and_reload_data ()
435516def get_worker_transfers ():
436517 try :
437518 # Get the transfer type from query parameters
@@ -565,6 +646,7 @@ def downsample_task_execution_time(points):
565646 return result
566647
567648@app .route ('/api/task-execution-time' )
649+ @check_and_reload_data ()
568650def get_task_execution_time ():
569651 try :
570652 data = {}
@@ -684,6 +766,7 @@ def downsample_task_concurrency(points):
684766 return result
685767
686768@app .route ('/api/task-concurrency' )
769+ @check_and_reload_data ()
687770def get_task_concurrency ():
688771 try :
689772 data = {}
@@ -855,6 +938,7 @@ def downsample_file_replicas(points):
855938 return result
856939
857940@app .route ('/api/file-replicas' )
941+ @check_and_reload_data ()
858942def get_file_replicas ():
859943 try :
860944 order = request .args .get ('order' , 'desc' ) # default to descending
@@ -984,6 +1068,7 @@ def downsample_file_sizes(points):
9841068 return result
9851069
9861070@app .route ('/api/file-sizes' )
1071+ @check_and_reload_data ()
9871072def get_file_sizes ():
9881073 try :
9891074 # Get the transfer type from query parameters
@@ -1077,6 +1162,7 @@ def get_file_sizes():
10771162 return jsonify ({'error' : str (e )}), 500
10781163
10791164@app .route ('/api/subgraphs' )
1165+ @check_and_reload_data ()
10801166def get_subgraphs ():
10811167 try :
10821168 data = {}
0 commit comments