Skip to content

Commit d890af7

Browse files
authored
Merge pull request #83 from codelion/fix-timeout-implementation
fix and add test
2 parents 3d783a2 + 0fac26a commit d890af7

File tree

6 files changed

+630
-51
lines changed

6 files changed

+630
-51
lines changed

examples/mlx_metal_kernel_opt/README.md

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# 🎯 Qwen3-0.6B Custom Metal Kernel Optimization with OpenEvolve
1+
# 🎯Custom Metal Kernel Optimization with OpenEvolve
22

33
**Evolving custom GPU kernels for Grouped Query Attention using MLX Metal kernels for Qwen3-0.6B on Apple Silicon**
44

@@ -416,29 +416,3 @@ python run_benchmarks.py --mode compare
416416
---
417417

418418
**🎯 This example demonstrates OpenEvolve's capability to discover genuine algorithmic improvements through evolutionary optimization, achieving measurable performance gains on real hardware with production-ready implementations.**
419-
420-
## 🔧 **Recent Improvements**
421-
422-
### **✅ Correct Terminology**
423-
- **Before**: Incorrect references to "chunked GQA processing"
424-
- **After**: Accurate descriptions of custom Metal kernel optimization
425-
- **Benefits**: Technical accuracy and clear understanding of actual discoveries
426-
427-
### **✅ Comprehensive Testing**
428-
- **Before**: Basic performance measurement
429-
- **After**: 17-scenario comprehensive benchmark suite with statistical validation
430-
- **Benefits**: Robust performance analysis and reproducible results
431-
432-
### **✅ Production Integration**
433-
- **Before**: Standalone optimization experiments
434-
- **After**: Full MLX-LM integration with seamless switching
435-
- **Benefits**: Real-world usability and easy adoption
436-
437-
### **✅ Detailed Documentation**
438-
- **Before**: High-level optimization descriptions
439-
- **After**: Complete technical details with actual kernel code snippets
440-
- **Benefits**: Understanding, reproducibility, and further research
441-
442-
---
443-
444-
**🚀 Ready for custom Metal kernel evolution with comprehensive benchmarking and detailed analysis!**

openevolve/evaluator.py

Lines changed: 105 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,20 @@ async def evaluate_program(
134134
# Process the result based on type
135135
eval_result = self._process_evaluation_result(result)
136136

137+
# Check if this was a timeout and capture artifacts if enabled
138+
if artifacts_enabled and program_id and eval_result.metrics.get("timeout") is True:
139+
if program_id not in self._pending_artifacts:
140+
self._pending_artifacts[program_id] = {}
141+
142+
self._pending_artifacts[program_id].update(
143+
{
144+
"timeout": True,
145+
"timeout_duration": self.config.timeout,
146+
"failure_stage": "evaluation",
147+
"error_type": "timeout",
148+
}
149+
)
150+
137151
# Add LLM feedback if configured
138152
llm_eval_result = None
139153
if self.config.use_llm_feedback and self.llm_ensemble:
@@ -153,7 +167,8 @@ async def evaluate_program(
153167
)
154168
and program_id
155169
):
156-
self._pending_artifacts[program_id] = {}
170+
if program_id not in self._pending_artifacts:
171+
self._pending_artifacts[program_id] = {}
157172

158173
# Merge eval_result artifacts with llm artifacts if they exist
159174
if eval_result.has_artifacts():
@@ -179,6 +194,21 @@ async def evaluate_program(
179194
# Return just metrics for backward compatibility
180195
return eval_result.metrics
181196

197+
except asyncio.TimeoutError:
198+
# Handle timeout specially - don't retry, just return timeout result
199+
logger.warning(f"Evaluation timed out after {self.config.timeout}s")
200+
201+
# Capture timeout artifacts if enabled
202+
if artifacts_enabled and program_id:
203+
self._pending_artifacts[program_id] = {
204+
"timeout": True,
205+
"timeout_duration": self.config.timeout,
206+
"failure_stage": "evaluation",
207+
"error_type": "timeout",
208+
}
209+
210+
return {"error": 0.0, "timeout": True}
211+
182212
except Exception as e:
183213
last_exception = e
184214
logger.warning(
@@ -192,6 +222,7 @@ async def evaluate_program(
192222
"stderr": str(e),
193223
"traceback": traceback.format_exc(),
194224
"failure_stage": "evaluation",
225+
"attempt": attempt + 1,
195226
}
196227

197228
# If this is not the last attempt, wait a bit before retrying
@@ -242,32 +273,36 @@ def get_pending_artifacts(self, program_id: str) -> Optional[Dict[str, Union[str
242273
"""
243274
return self._pending_artifacts.pop(program_id, None)
244275

245-
@run_in_executor
246-
def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
276+
async def _direct_evaluate(self, program_path: str) -> Dict[str, float]:
247277
"""
248-
Directly evaluate a program using the evaluation function
278+
Directly evaluate a program using the evaluation function with timeout
249279
250280
Args:
251281
program_path: Path to the program file
252282
253283
Returns:
254284
Dictionary of metric name to score
285+
286+
Raises:
287+
asyncio.TimeoutError: If evaluation exceeds timeout
288+
Exception: If evaluation function raises an exception
255289
"""
256-
try:
257-
# Run the evaluation with timeout
258-
result = self.evaluate_function(program_path)
259290

260-
# Validate result
261-
if not isinstance(result, dict):
262-
logger.warning(f"Evaluation returned non-dictionary result: {result}")
263-
return {"error": 0.0}
291+
# Create a coroutine that runs the evaluation function in an executor
292+
async def run_evaluation():
293+
loop = asyncio.get_event_loop()
294+
return await loop.run_in_executor(None, self.evaluate_function, program_path)
264295

265-
return result
296+
# Run the evaluation with timeout - let exceptions bubble up for retry handling
297+
result = await asyncio.wait_for(run_evaluation(), timeout=self.config.timeout)
266298

267-
except Exception as e:
268-
logger.error(f"Error in direct evaluation: {str(e)}")
299+
# Validate result
300+
if not isinstance(result, dict):
301+
logger.warning(f"Evaluation returned non-dictionary result: {result}")
269302
return {"error": 0.0}
270303

304+
return result
305+
271306
async def _cascade_evaluate(
272307
self, program_path: str
273308
) -> Union[Dict[str, float], EvaluationResult]:
@@ -299,10 +334,24 @@ async def _cascade_evaluate(
299334
if not hasattr(module, "evaluate_stage1"):
300335
return await self._direct_evaluate(program_path)
301336

302-
# Run first stage
337+
# Run first stage with timeout
303338
try:
304-
stage1_result = await run_in_executor(module.evaluate_stage1)(program_path)
339+
340+
async def run_stage1():
341+
loop = asyncio.get_event_loop()
342+
return await loop.run_in_executor(None, module.evaluate_stage1, program_path)
343+
344+
stage1_result = await asyncio.wait_for(run_stage1(), timeout=self.config.timeout)
305345
stage1_eval_result = self._process_evaluation_result(stage1_result)
346+
except asyncio.TimeoutError:
347+
logger.warning(f"Stage 1 evaluation timed out after {self.config.timeout}s")
348+
return EvaluationResult(
349+
metrics={"stage1_passed": 0.0, "error": 0.0, "timeout": True},
350+
artifacts={
351+
"failure_stage": "stage1",
352+
"timeout": True,
353+
},
354+
)
306355
except Exception as e:
307356
logger.error(f"Error in stage 1 evaluation: {str(e)}")
308357
# Capture stage 1 failure as artifacts
@@ -325,10 +374,27 @@ async def _cascade_evaluate(
325374
if not hasattr(module, "evaluate_stage2"):
326375
return stage1_eval_result
327376

328-
# Run second stage
377+
# Run second stage with timeout
329378
try:
330-
stage2_result = await run_in_executor(module.evaluate_stage2)(program_path)
379+
380+
async def run_stage2():
381+
loop = asyncio.get_event_loop()
382+
return await loop.run_in_executor(None, module.evaluate_stage2, program_path)
383+
384+
stage2_result = await asyncio.wait_for(run_stage2(), timeout=self.config.timeout)
331385
stage2_eval_result = self._process_evaluation_result(stage2_result)
386+
except asyncio.TimeoutError:
387+
logger.warning(f"Stage 2 evaluation timed out after {self.config.timeout}s")
388+
# Capture stage 2 failure, but keep stage 1 results
389+
stage1_eval_result.artifacts.update(
390+
{
391+
"stage2_timeout": True,
392+
"failure_stage": "stage2",
393+
}
394+
)
395+
stage1_eval_result.metrics["stage2_passed"] = 0.0
396+
stage1_eval_result.metrics["timeout"] = True
397+
return stage1_eval_result
332398
except Exception as e:
333399
logger.error(f"Error in stage 2 evaluation: {str(e)}")
334400
# Capture stage 2 failure, but keep stage 1 results
@@ -370,10 +436,27 @@ async def _cascade_evaluate(
370436
if not hasattr(module, "evaluate_stage3"):
371437
return merged_result
372438

373-
# Run third stage
439+
# Run third stage with timeout
374440
try:
375-
stage3_result = await run_in_executor(module.evaluate_stage3)(program_path)
441+
442+
async def run_stage3():
443+
loop = asyncio.get_event_loop()
444+
return await loop.run_in_executor(None, module.evaluate_stage3, program_path)
445+
446+
stage3_result = await asyncio.wait_for(run_stage3(), timeout=self.config.timeout)
376447
stage3_eval_result = self._process_evaluation_result(stage3_result)
448+
except asyncio.TimeoutError:
449+
logger.warning(f"Stage 3 evaluation timed out after {self.config.timeout}s")
450+
# Capture stage 3 failure, but keep previous results
451+
merged_result.artifacts.update(
452+
{
453+
"stage3_timeout": True,
454+
"failure_stage": "stage3",
455+
}
456+
)
457+
merged_result.metrics["stage3_passed"] = 0.0
458+
merged_result.metrics["timeout"] = True
459+
return merged_result
377460
except Exception as e:
378461
logger.error(f"Error in stage 3 evaluation: {str(e)}")
379462
# Capture stage 3 failure, but keep previous results
@@ -398,8 +481,9 @@ async def _cascade_evaluate(
398481

399482
except Exception as e:
400483
logger.error(f"Error in cascade evaluation: {str(e)}")
484+
# Return proper cascade failure result instead of re-raising
401485
return EvaluationResult(
402-
metrics={"error": 0.0},
486+
metrics={"stage1_passed": 0.0, "error": 0.0},
403487
artifacts={
404488
"stderr": str(e),
405489
"traceback": traceback.format_exc(),

openevolve/utils/async_utils.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,60 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
3232
return wrapper
3333

3434

35+
async def run_with_timeout(
36+
coro: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any
37+
) -> Any:
38+
"""
39+
Run a coroutine with a timeout, returning a default value on timeout
40+
41+
Args:
42+
coro: Coroutine function to run
43+
timeout: Timeout in seconds
44+
*args: Arguments to pass to the coroutine
45+
timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True})
46+
**kwargs: Keyword arguments to pass to the coroutine
47+
48+
Returns:
49+
Result of the coroutine or timeout_error_value on timeout
50+
"""
51+
if timeout_error_value is None:
52+
timeout_error_value = {"error": 0.0, "timeout": True}
53+
54+
try:
55+
return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
56+
except asyncio.TimeoutError:
57+
logger.warning(f"Operation timed out after {timeout}s")
58+
return timeout_error_value
59+
60+
61+
async def run_sync_with_timeout(
62+
func: Callable, timeout: float, *args: Any, timeout_error_value: Any = None, **kwargs: Any
63+
) -> Any:
64+
"""
65+
Run a synchronous function in an executor with a timeout
66+
67+
Args:
68+
func: Synchronous function to run
69+
timeout: Timeout in seconds
70+
*args: Arguments to pass to the function
71+
timeout_error_value: Value to return on timeout (default: {"error": 0.0, "timeout": True})
72+
**kwargs: Keyword arguments to pass to the function
73+
74+
Returns:
75+
Result of the function or timeout_error_value on timeout
76+
"""
77+
if timeout_error_value is None:
78+
timeout_error_value = {"error": 0.0, "timeout": True}
79+
80+
try:
81+
loop = asyncio.get_event_loop()
82+
task = loop.run_in_executor(None, functools.partial(func, *args, **kwargs))
83+
return await asyncio.wait_for(task, timeout=timeout)
84+
except asyncio.TimeoutError:
85+
logger.warning(f"Sync operation timed out after {timeout}s")
86+
return timeout_error_value
87+
88+
3589
async def gather_with_concurrency(
3690
n: int, *tasks: asyncio.Future, return_exceptions: bool = False
3791
) -> List[Any]:
@@ -116,9 +170,17 @@ class TaskPool:
116170
"""
117171

118172
def __init__(self, max_concurrency: int = 10):
119-
self.semaphore = asyncio.Semaphore(max_concurrency)
173+
self.max_concurrency = max_concurrency
174+
self._semaphore: Optional[asyncio.Semaphore] = None
120175
self.tasks: List[asyncio.Task] = []
121176

177+
@property
178+
def semaphore(self) -> asyncio.Semaphore:
179+
"""Lazy-initialize the semaphore when first needed"""
180+
if self._semaphore is None:
181+
self._semaphore = asyncio.Semaphore(self.max_concurrency)
182+
return self._semaphore
183+
122184
async def run(self, coro: Callable, *args: Any, **kwargs: Any) -> Any:
123185
"""
124186
Run a coroutine in the pool

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "openevolve"
7-
version = "0.0.1"
7+
version = "0.0.2"
88
description = "Open-source implementation of AlphaEvolve"
99
readme = "README.md"
1010
requires-python = ">=3.9"

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name="openevolve",
5-
version="0.0.1",
5+
version="0.0.2",
66
packages=find_packages(),
77
include_package_data=True,
88
)

0 commit comments

Comments
 (0)