@@ -242,10 +242,9 @@ def get_pending_artifacts(self, program_id: str) -> Optional[Dict[str, Union[str
242242 """
243243 return self ._pending_artifacts .pop (program_id , None )
244244
245- @run_in_executor
246- def _direct_evaluate (self , program_path : str ) -> Dict [str , float ]:
245+ async def _direct_evaluate (self , program_path : str ) -> Dict [str , float ]:
247246 """
248- Directly evaluate a program using the evaluation function
247+ Directly evaluate a program using the evaluation function with timeout
249248
250249 Args:
251250 program_path: Path to the program file
@@ -254,8 +253,13 @@ def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
254253 Dictionary of metric name to score
255254 """
256255 try :
256+ # Create a coroutine that runs the evaluation function in an executor
257+ async def run_evaluation ():
258+ loop = asyncio .get_event_loop ()
259+ return await loop .run_in_executor (None , self .evaluate_function , program_path )
260+
257261 # Run the evaluation with timeout
258- result = self . evaluate_function ( program_path )
262+ result = await asyncio . wait_for ( run_evaluation (), timeout = self . config . timeout )
259263
260264 # Validate result
261265 if not isinstance (result , dict ):
@@ -264,6 +268,9 @@ def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
264268
265269 return result
266270
271+ except asyncio .TimeoutError :
272+ logger .warning (f"Evaluation timed out after { self .config .timeout } s" )
273+ return {"error" : 0.0 , "timeout" : True }
267274 except Exception as e :
268275 logger .error (f"Error in direct evaluation: { str (e )} " )
269276 return {"error" : 0.0 }
@@ -299,10 +306,24 @@ async def _cascade_evaluate(
299306 if not hasattr (module , "evaluate_stage1" ):
300307 return await self ._direct_evaluate (program_path )
301308
302- # Run first stage
309+ # Run first stage with timeout
303310 try :
304- stage1_result = await run_in_executor (module .evaluate_stage1 )(program_path )
311+
312+ async def run_stage1 ():
313+ loop = asyncio .get_event_loop ()
314+ return await loop .run_in_executor (None , module .evaluate_stage1 , program_path )
315+
316+ stage1_result = await asyncio .wait_for (run_stage1 (), timeout = self .config .timeout )
305317 stage1_eval_result = self ._process_evaluation_result (stage1_result )
318+ except asyncio .TimeoutError :
319+ logger .warning (f"Stage 1 evaluation timed out after { self .config .timeout } s" )
320+ return EvaluationResult (
321+ metrics = {"stage1_passed" : 0.0 , "error" : 0.0 , "timeout" : True },
322+ artifacts = {
323+ "failure_stage" : "stage1" ,
324+ "timeout" : True ,
325+ },
326+ )
306327 except Exception as e :
307328 logger .error (f"Error in stage 1 evaluation: { str (e )} " )
308329 # Capture stage 1 failure as artifacts
@@ -325,10 +346,27 @@ async def _cascade_evaluate(
325346 if not hasattr (module , "evaluate_stage2" ):
326347 return stage1_eval_result
327348
328- # Run second stage
349+ # Run second stage with timeout
329350 try :
330- stage2_result = await run_in_executor (module .evaluate_stage2 )(program_path )
351+
352+ async def run_stage2 ():
353+ loop = asyncio .get_event_loop ()
354+ return await loop .run_in_executor (None , module .evaluate_stage2 , program_path )
355+
356+ stage2_result = await asyncio .wait_for (run_stage2 (), timeout = self .config .timeout )
331357 stage2_eval_result = self ._process_evaluation_result (stage2_result )
358+ except asyncio .TimeoutError :
359+ logger .warning (f"Stage 2 evaluation timed out after { self .config .timeout } s" )
360+ # Capture stage 2 failure, but keep stage 1 results
361+ stage1_eval_result .artifacts .update (
362+ {
363+ "stage2_timeout" : True ,
364+ "failure_stage" : "stage2" ,
365+ }
366+ )
367+ stage1_eval_result .metrics ["stage2_passed" ] = 0.0
368+ stage1_eval_result .metrics ["timeout" ] = True
369+ return stage1_eval_result
332370 except Exception as e :
333371 logger .error (f"Error in stage 2 evaluation: { str (e )} " )
334372 # Capture stage 2 failure, but keep stage 1 results
@@ -370,10 +408,27 @@ async def _cascade_evaluate(
370408 if not hasattr (module , "evaluate_stage3" ):
371409 return merged_result
372410
373- # Run third stage
411+ # Run third stage with timeout
374412 try :
375- stage3_result = await run_in_executor (module .evaluate_stage3 )(program_path )
413+
414+ async def run_stage3 ():
415+ loop = asyncio .get_event_loop ()
416+ return await loop .run_in_executor (None , module .evaluate_stage3 , program_path )
417+
418+ stage3_result = await asyncio .wait_for (run_stage3 (), timeout = self .config .timeout )
376419 stage3_eval_result = self ._process_evaluation_result (stage3_result )
420+ except asyncio .TimeoutError :
421+ logger .warning (f"Stage 3 evaluation timed out after { self .config .timeout } s" )
422+ # Capture stage 3 failure, but keep previous results
423+ merged_result .artifacts .update (
424+ {
425+ "stage3_timeout" : True ,
426+ "failure_stage" : "stage3" ,
427+ }
428+ )
429+ merged_result .metrics ["stage3_passed" ] = 0.0
430+ merged_result .metrics ["timeout" ] = True
431+ return merged_result
377432 except Exception as e :
378433 logger .error (f"Error in stage 3 evaluation: { str (e )} " )
379434 # Capture stage 3 failure, but keep previous results
0 commit comments