@@ -133,30 +133,7 @@ def _create_progress_bar(self):
133133 )
134134
135135 def write_df_to_csv (self , df , csv_file_path , ** kwargs ):
136- import os
137- import math
138-
139- total_rows = len (df )
140- if total_rows == 0 :
141- df .to_csv (csv_file_path , ** kwargs )
142- return
143-
144- num_chunks = min (100 , total_rows )
145- chunk_size = math .ceil (total_rows / num_chunks )
146- filename = os .path .basename (csv_file_path )
147-
148- progress = self ._create_progress_bar ()
149- task_id = progress .add_task (f"=== Writing { filename } " , total = total_rows )
150-
151- with progress :
152- with open (csv_file_path , 'w' , newline = '' ) as f :
153- for i in range (num_chunks ):
154- start = i * chunk_size
155- end = min ((i + 1 ) * chunk_size , total_rows )
156- chunk = df .iloc [start :end ]
157- header = (i == 0 )
158- chunk .to_csv (f , header = header , ** kwargs )
159- progress .update (task_id , advance = len (chunk ))
136+ df .to_csv (csv_file_path , ** kwargs )
160137
161138 def get_current_worker_by_ip_port (self , worker_ip : str , worker_port : int ):
162139 connect_id = self .current_worker_connect_id [(worker_ip , worker_port )]
@@ -1701,7 +1678,7 @@ def generate_task_execution_details_metrics(self):
17011678 base_time = self .MIN_TIME
17021679 rows = []
17031680
1704- # Task status mappings
1681+ # Task status mappings
17051682 TASK_STATUS_NAMES = {
17061683 1 : 'unsuccessful-input-missing' ,
17071684 2 : 'unsuccessful-output-missing' ,
@@ -1768,7 +1745,7 @@ def generate_task_execution_details_metrics(self):
17681745 'when_waiting_retrieval' : round (task .when_waiting_retrieval - base_time , 2 ) if task .when_waiting_retrieval else None ,
17691746 'when_retrieved' : round (task .when_retrieved - base_time , 2 ) if task .when_retrieved else None ,
17701747 'when_done' : round (task .when_done - base_time , 2 ) if task .when_done else None ,
1771- 'task_type ' : 'successful ' ,
1748+ 'record_type ' : 'successful_tasks ' ,
17721749 'unsuccessful_checkbox_name' : '' ,
17731750 'when_failure_happens' : None ,
17741751 })
@@ -1781,7 +1758,7 @@ def generate_task_execution_details_metrics(self):
17811758 'when_failure_happens' : round (task .when_failure_happens - base_time , 2 ) if task .when_failure_happens else None ,
17821759 'execution_time' : round (task .when_failure_happens - task .when_running , 2 ) if task .when_failure_happens and task .when_running else None ,
17831760 'when_done' : round (task .when_done - base_time , 2 ) if task .when_done else None ,
1784- 'task_type ' : 'unsuccessful ' ,
1761+ 'record_type ' : 'unsuccessful_tasks ' ,
17851762 'unsuccessful_checkbox_name' : TASK_STATUS_NAMES .get (task .task_status , 'unknown' ),
17861763 })
17871764
@@ -1819,7 +1796,7 @@ def generate_task_execution_details_metrics(self):
18191796 'when_retrieved' : pd .NA ,
18201797 'when_failure_happens' : pd .NA ,
18211798 'when_done' : pd .NA ,
1822- 'task_type ' : 'worker' ,
1799+ 'record_type ' : 'worker' ,
18231800 'unsuccessful_checkbox_name' : pd .NA ,
18241801 'hash' : worker .hash ,
18251802 'time_connected' : [round (max (t - base_time , 0 ), 2 ) for t in worker .time_connected ],
@@ -1838,7 +1815,7 @@ def generate_task_execution_details_metrics(self):
18381815
18391816 # Define column order
18401817 columns = [
1841- 'task_type ' , 'task_id' , 'task_try_id' , 'worker_entry' , 'worker_id' , 'core_id' ,
1818+ 'record_type ' , 'task_id' , 'task_try_id' , 'worker_entry' , 'worker_id' , 'core_id' ,
18421819 'is_recovery_task' , 'input_files' , 'output_files' , 'num_input_files' , 'num_output_files' ,
18431820 'task_status' , 'category' , 'when_ready' , 'when_running' , 'time_worker_start' ,
18441821 'time_worker_end' , 'execution_time' , 'when_waiting_retrieval' , 'when_retrieved' ,
0 commit comments