Skip to content

Commit 470a271

Browse files
committed
Refactor to threaded parallel evolution and remove file locks
Replaces process-based parallelism with a new thread-based parallel controller using shared memory for improved performance and reliability. Removes filelock usage and related code from the database, as thread-based parallelism does not require file-based locking. Updates the main controller to use the new parallel system, adds checkpoint resume support, and adapts iteration logic for thread safety. Cleans up dependencies by removing filelock from requirements.
1 parent ab4d044 commit 470a271

File tree

6 files changed

+546
-302
lines changed

6 files changed

+546
-302
lines changed

openevolve/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async def main_async() -> int:
126126
best_program = await openevolve.run(
127127
iterations=args.iterations,
128128
target_score=args.target_score,
129+
checkpoint_path=args.checkpoint,
129130
)
130131

131132
# Get the checkpoint path

openevolve/controller.py

Lines changed: 80 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,18 @@
55
import asyncio
66
import logging
77
import os
8-
import shutil
9-
import re
8+
import signal
109
import time
1110
import uuid
1211
from pathlib import Path
13-
from typing import Any, Dict, List, Optional, Tuple, Union
14-
import traceback
15-
import concurrent.futures
12+
from typing import Any, Dict, List, Optional, Union
1613

1714
from openevolve.config import Config, load_config
1815
from openevolve.database import Program, ProgramDatabase
1916
from openevolve.evaluator import Evaluator
2017
from openevolve.llm.ensemble import LLMEnsemble
2118
from openevolve.prompt.sampler import PromptSampler
22-
from openevolve.iteration import run_iteration_sync, Result
19+
from openevolve.threaded_parallel import ImprovedParallelController
2320
from openevolve.utils.code_utils import (
2421
extract_code_language,
2522
)
@@ -164,6 +161,9 @@ def __init__(
164161
self.evaluation_file = evaluation_file
165162

166163
logger.info(f"Initialized OpenEvolve with {initial_program_path}")
164+
165+
# Initialize improved parallel processing components
166+
self.parallel_controller = None
167167

168168
def _setup_logging(self) -> None:
169169
"""Set up logging"""
@@ -198,24 +198,31 @@ async def run(
198198
self,
199199
iterations: Optional[int] = None,
200200
target_score: Optional[float] = None,
201-
) -> Program:
201+
checkpoint_path: Optional[str] = None,
202+
) -> Optional[Program]:
202203
"""
203-
Run the evolution process
204+
Run the evolution process with improved parallel processing
204205
205206
Args:
206207
iterations: Maximum number of iterations (uses config if None)
207208
target_score: Target score to reach (continues until reached if specified)
209+
checkpoint_path: Path to resume from checkpoint
208210
209211
Returns:
210212
Best program found
211213
"""
212214
max_iterations = iterations or self.config.max_iterations
213-
214-
# Define start_iteration before creating the initial program
215-
start_iteration = self.database.last_iteration
215+
216+
# Determine starting iteration
217+
start_iteration = 0
218+
if checkpoint_path and os.path.exists(checkpoint_path):
219+
self._load_checkpoint(checkpoint_path)
220+
start_iteration = self.database.last_iteration + 1
221+
logger.info(f"Resuming from checkpoint at iteration {start_iteration}")
222+
else:
223+
start_iteration = self.database.last_iteration
216224

217225
# Only add initial program if starting fresh (not resuming from checkpoint)
218-
# Check if we're resuming AND no program matches initial code to avoid pollution
219226
should_add_initial = (
220227
start_iteration == 0
221228
and len(self.database.programs) == 0
@@ -244,144 +251,49 @@ async def run(
244251
self.database.add(initial_program)
245252
else:
246253
logger.info(
247-
f"Skipping initial program addition (resuming from iteration {start_iteration} with {len(self.database.programs)} existing programs)"
254+
f"Skipping initial program addition (resuming from iteration {start_iteration} "
255+
f"with {len(self.database.programs)} existing programs)"
248256
)
249257

250-
# Main evolution loop
251-
total_iterations = start_iteration + max_iterations
252-
253-
logger.info(
254-
f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})"
255-
)
256-
257-
# Island-based evolution variables
258-
programs_per_island = max(
259-
1, max_iterations // (self.config.database.num_islands * 10)
260-
) # Dynamic allocation
261-
current_island_counter = 0
262-
263-
logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands")
264-
self.database.log_island_status()
265-
266-
# create temp file to save database snapshots to for process workers to load from
267-
temp_db_path = "tmp/" + str(uuid.uuid4())
268-
self.database.save(temp_db_path, start_iteration)
269-
270-
with concurrent.futures.ProcessPoolExecutor(
271-
max_workers=self.config.evaluator.parallel_evaluations
272-
) as executor:
273-
futures = []
274-
for i in range(start_iteration, total_iterations):
275-
futures.append(
276-
executor.submit(
277-
run_iteration_sync, i, self.config, self.evaluation_file, temp_db_path
278-
)
279-
)
280-
281-
iteration = start_iteration + 1
282-
for future in concurrent.futures.as_completed(futures):
283-
logger.info(f"Completed iteration {iteration}")
284-
try:
285-
result: Result = future.result()
286-
# if result is nonType
287-
if not isinstance(result, Result):
288-
logger.warning(f"No valid diffs or program length exceeded limit")
289-
continue
290-
# Manage island evolution - switch islands periodically
291-
if (
292-
iteration - 1 > start_iteration
293-
and current_island_counter >= programs_per_island
294-
):
295-
self.database.next_island()
296-
current_island_counter = 0
297-
logger.debug(f"Switched to island {self.database.current_island}")
298-
299-
current_island_counter += 1
300-
301-
# Add to database (will be added to current island)
302-
self.database.add(result.child_program, iteration=iteration)
303-
304-
# Log prompts
305-
self.database.log_prompt(
306-
template_key=(
307-
"full_rewrite_user" if not self.config.diff_based_evolution else "diff_user"
308-
),
309-
program_id=result.child_program.id,
310-
prompt=result.prompt,
311-
responses=[result.llm_response],
312-
)
313-
314-
# Store artifacts if they exist (after program is added to database)
315-
if result.artifacts:
316-
self.database.store_artifacts(result.child_program.id, result.artifacts)
317-
318-
# Log prompts
319-
self.database.log_prompt(
320-
template_key=(
321-
"full_rewrite_user" if not self.config.diff_based_evolution else "diff_user"
322-
),
323-
program_id=result.child_program.id,
324-
prompt=result.prompt,
325-
responses=[result.llm_response],
326-
)
327-
328-
# Increment generation for current island
329-
self.database.increment_island_generation()
330-
331-
# Check if migration should occur
332-
if self.database.should_migrate():
333-
logger.info(f"Performing migration at iteration {iteration}")
334-
self.database.migrate_programs()
335-
self.database.log_island_status()
336-
337-
# Log progress
338-
self._log_iteration(
339-
iteration, result.parent, result.child_program, result.iteration_time
340-
)
258+
# Initialize improved parallel processing
259+
try:
260+
self.parallel_controller = ImprovedParallelController(
261+
self.config, self.evaluation_file, self.database
262+
)
263+
264+
# Set up signal handlers for graceful shutdown
265+
def signal_handler(signum, frame):
266+
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
267+
self.parallel_controller.request_shutdown()
268+
269+
signal.signal(signal.SIGINT, signal_handler)
270+
signal.signal(signal.SIGTERM, signal_handler)
271+
272+
self.parallel_controller.start()
273+
274+
# Run evolution with improved parallel processing and checkpoint callback
275+
await self._run_evolution_with_checkpoints(
276+
start_iteration, max_iterations, target_score
277+
)
278+
279+
finally:
280+
# Clean up parallel processing resources
281+
if self.parallel_controller:
282+
self.parallel_controller.stop()
283+
self.parallel_controller = None
341284

342-
# Specifically check if this is the new best program
343-
if self.database.best_program_id == result.child_program.id:
344-
logger.info(
345-
f"🌟 New best solution found at iteration {iteration}: {result.child_program.id}"
346-
)
347-
logger.info(f"Metrics: {format_metrics_safe(result.child_program.metrics)}")
348-
349-
# Save checkpoint
350-
if (iteration) % self.config.checkpoint_interval == 0:
351-
self._save_checkpoint(iteration)
352-
# Also log island status at checkpoints
353-
logger.info(f"Island status at checkpoint {iteration}:")
354-
self.database.log_island_status()
355-
356-
# Check if target score reached
357-
if target_score is not None:
358-
avg_score = sum(result["child_metrics"].values()) / max(
359-
1, len(result.child_metrics)
360-
)
361-
if avg_score >= target_score:
362-
logger.info(
363-
f"Target score {target_score} reached after {iteration} iterations"
364-
)
365-
break
366-
self.database.save(temp_db_path, iteration)
367-
iteration += 1
368-
except Exception as e:
369-
logger.error(f"Error in iteration {iteration}: {str(e)}")
370-
continue
371-
shutil.rmtree(temp_db_path)
372-
# Get the best program using our tracking mechanism
285+
# Get the best program
373286
best_program = None
374287
if self.database.best_program_id:
375288
best_program = self.database.get(self.database.best_program_id)
376289
logger.info(f"Using tracked best program: {self.database.best_program_id}")
377290

378-
# Fallback to calculating best program if tracked program not found
379291
if best_program is None:
380292
best_program = self.database.get_best_program()
381293
logger.info("Using calculated best program (tracked program not found)")
382294

383295
# Check if there's a better program by combined_score that wasn't tracked
384-
if "combined_score" in best_program.metrics:
296+
if best_program and "combined_score" in best_program.metrics:
385297
best_by_combined = self.database.get_best_program(metric="combined_score")
386298
if (
387299
best_by_combined
@@ -397,7 +309,8 @@ async def run(
397309
f"Found program with better combined_score: {best_by_combined.id}"
398310
)
399311
logger.warning(
400-
f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}"
312+
f"Score difference: {best_program.metrics['combined_score']:.4f} vs "
313+
f"{best_by_combined.metrics['combined_score']:.4f}"
401314
)
402315
best_program = best_by_combined
403316

@@ -406,14 +319,10 @@ async def run(
406319
f"Evolution complete. Best program has metrics: "
407320
f"{format_metrics_safe(best_program.metrics)}"
408321
)
409-
410-
# Save the best program (using our tracked best program)
411322
self._save_best_program(best_program)
412-
413323
return best_program
414324
else:
415325
logger.warning("No valid programs found during evolution")
416-
# Return None if no programs found instead of undefined initial_program
417326
return None
418327

419328
def _log_iteration(
@@ -499,6 +408,35 @@ def _save_checkpoint(self, iteration: int) -> None:
499408

500409
logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}")
501410

411+
def _load_checkpoint(self, checkpoint_path: str) -> None:
412+
"""Load state from a checkpoint directory"""
413+
if not os.path.exists(checkpoint_path):
414+
raise FileNotFoundError(f"Checkpoint directory {checkpoint_path} not found")
415+
416+
logger.info(f"Loading checkpoint from {checkpoint_path}")
417+
self.database.load(checkpoint_path)
418+
logger.info(
419+
f"Checkpoint loaded successfully (iteration {self.database.last_iteration})"
420+
)
421+
422+
async def _run_evolution_with_checkpoints(
423+
self, start_iteration: int, max_iterations: int, target_score: Optional[float]
424+
) -> None:
425+
"""Run evolution with checkpoint saving support"""
426+
logger.info(f"Using island-based evolution with {self.config.database.num_islands} islands")
427+
self.database.log_island_status()
428+
429+
# Run the evolution process with checkpoint callback
430+
await self.parallel_controller.run_evolution(
431+
start_iteration, max_iterations, target_score,
432+
checkpoint_callback=self._save_checkpoint
433+
)
434+
435+
# Save final checkpoint if needed
436+
final_iteration = start_iteration + max_iterations - 1
437+
if final_iteration > 0 and final_iteration % self.config.checkpoint_interval == 0:
438+
self._save_checkpoint(final_iteration)
439+
502440
def _save_best_program(self, program: Optional[Program] = None) -> None:
503441
"""
504442
Save the best program

0 commit comments

Comments
 (0)