@@ -73,13 +73,13 @@ def _process_worker(
7373 sys .stderr .reconfigure (line_buffering = True ) # type: ignore
7474 except AttributeError :
7575 pass
76-
76+
7777 # Set up signal handler for clean interruption
7878 def signal_handler (signum , frame ):
7979 logger .warning (f"Worker { worker_id } : Received interrupt signal" )
8080 # Raise KeyboardInterrupt to actually interrupt the worker
8181 raise KeyboardInterrupt (f"Worker { worker_id } interrupted by user" )
82-
82+
8383 signal .signal (signal .SIGINT , signal_handler )
8484
8585 # Reinitialize telemetry in this process
@@ -173,13 +173,19 @@ async def process_single_task(index: int, task_dict: dict[str, Any]) -> tuple[in
173173 except asyncio .CancelledError :
174174 logger .info (f"Worker { worker_id } : Tasks cancelled due to interruption" )
175175 # Return error results for all tasks
176- return [(idx , {
177- "error" : "Task cancelled (Ctrl+C)" ,
178- "isError" : True ,
179- "reward" : 0.0 ,
180- "done" : False ,
181- "content" : "Task cancelled"
182- }) for idx , _ in task_batch ]
176+ return [
177+ (
178+ idx ,
179+ {
180+ "error" : "Task cancelled (Ctrl+C)" ,
181+ "isError" : True ,
182+ "reward" : 0.0 ,
183+ "done" : False ,
184+ "content" : "Task cancelled" ,
185+ },
186+ )
187+ for idx , _ in task_batch
188+ ]
183189
184190 try :
185191 # Run the async batch processing
@@ -206,13 +212,18 @@ async def process_single_task(index: int, task_dict: dict[str, Any]) -> tuple[in
206212 # Return partial results for tasks that completed
207213 partial_results = []
208214 for idx , _ in task_batch :
209- partial_results .append ((idx , {
210- "error" : "Worker interrupted by user (Ctrl+C)" ,
211- "isError" : True ,
212- "reward" : 0.0 ,
213- "done" : False ,
214- "content" : "Task interrupted"
215- }))
215+ partial_results .append (
216+ (
217+ idx ,
218+ {
219+ "error" : "Worker interrupted by user (Ctrl+C)" ,
220+ "isError" : True ,
221+ "reward" : 0.0 ,
222+ "done" : False ,
223+ "content" : "Task interrupted" ,
224+ },
225+ )
226+ )
216227 return partial_results
217228 except Exception as e :
218229 logger .error ("[Worker %s] Batch processing failed: %s" , worker_id , e )
@@ -443,7 +454,9 @@ async def run_dataset_parallel_manual(
443454
444455 except Exception as e :
445456 # Handle worker failure
446- logger .error ("Worker failed with exception: %s\n %s" , e , traceback .format_exc ())
457+ logger .error (
458+ "Worker failed with exception: %s\n %s" , e , traceback .format_exc ()
459+ )
447460
448461 # Mark all tasks in this batch as failed
449462 for index , _ in batch :
@@ -455,16 +468,16 @@ async def run_dataset_parallel_manual(
455468 "content" : f"Worker process failed: { e } " ,
456469 }
457470 completed += 1
458-
471+
459472 except KeyboardInterrupt :
460473 logger .warning ("\n ⚠️ Parallel evaluation interrupted by user (Ctrl+C)" )
461474 logger .info ("Cancelling pending tasks..." )
462-
475+
463476 # Cancel all pending futures
464477 for future in future_to_batch :
465478 if not future .done ():
466479 future .cancel ()
467-
480+
468481 # Mark uncompleted tasks as interrupted
469482 for i , r in enumerate (results ):
470483 if r is None :
@@ -475,10 +488,10 @@ async def run_dataset_parallel_manual(
475488 "done" : False ,
476489 "content" : "Task interrupted (Ctrl+C)" ,
477490 }
478-
491+
479492 logger .info (f"Interrupted after { completed } /{ total } tasks" )
480493 raise # Re-raise to propagate the interrupt
481-
494+
482495 finally :
483496 # Always shutdown the executor properly
484497 executor .shutdown (wait = False , cancel_futures = True )
0 commit comments