Skip to content

Commit b7da4fd

Browse files
committed
fix parallal island siaollation
1 parent b3101f2 commit b7da4fd

File tree

2 files changed

+364
-18
lines changed

2 files changed

+364
-18
lines changed

openevolve/process_parallel.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,18 @@ def __init__(self, config: Config, evaluation_file: str, database: ProgramDataba
268268

269269
# Number of worker processes
270270
self.num_workers = config.evaluator.parallel_evaluations
271+
272+
# Worker-to-island pinning for true island isolation
273+
self.num_islands = config.database.num_islands
274+
self.worker_island_map = {}
275+
276+
# Distribute workers across islands using modulo
277+
for worker_id in range(self.num_workers):
278+
island_id = worker_id % self.num_islands
279+
self.worker_island_map[worker_id] = island_id
271280

272281
logger.info(f"Initialized process parallel controller with {self.num_workers} workers")
282+
logger.info(f"Worker-to-island mapping: {self.worker_island_map}")
273283

274284
def _serialize_config(self, config: Config) -> dict:
275285
"""Serialize config object to a dictionary that can be pickled"""
@@ -372,17 +382,26 @@ async def run_evolution(
372382
f"for {max_iterations} iterations (total: {total_iterations})"
373383
)
374384

375-
# Track pending futures
385+
# Track pending futures by island to maintain distribution
376386
pending_futures: Dict[int, Future] = {}
387+
island_pending: Dict[int, List[int]] = {i: [] for i in range(self.num_islands)}
377388
batch_size = min(self.num_workers * 2, max_iterations)
378389

379-
# Submit initial batch
380-
for i in range(start_iteration, min(start_iteration + batch_size, total_iterations)):
381-
future = self._submit_iteration(i)
382-
if future:
383-
pending_futures[i] = future
384-
385-
next_iteration = start_iteration + batch_size
390+
# Submit initial batch - distribute across islands
391+
batch_per_island = max(1, batch_size // self.num_islands)
392+
current_iteration = start_iteration
393+
394+
# Round-robin distribution across islands
395+
for island_id in range(self.num_islands):
396+
for _ in range(batch_per_island):
397+
if current_iteration < total_iterations:
398+
future = self._submit_iteration(current_iteration, island_id)
399+
if future:
400+
pending_futures[current_iteration] = future
401+
island_pending[island_id].append(current_iteration)
402+
current_iteration += 1
403+
404+
next_iteration = current_iteration
386405
completed_iterations = 0
387406

388407
# Island management
@@ -529,13 +548,24 @@ async def run_evolution(
529548
logger.error(f"Error processing result from iteration {completed_iteration}: {e}")
530549

531550
completed_iterations += 1
551+
552+
# Remove completed iteration from island tracking
553+
for island_id, iteration_list in island_pending.items():
554+
if completed_iteration in iteration_list:
555+
iteration_list.remove(completed_iteration)
556+
break
532557

533-
# Submit next iteration
534-
if next_iteration < total_iterations and not self.shutdown_event.is_set():
535-
future = self._submit_iteration(next_iteration)
536-
if future:
537-
pending_futures[next_iteration] = future
538-
next_iteration += 1
558+
# Submit next iterations maintaining island balance
559+
for island_id in range(self.num_islands):
560+
if (len(island_pending[island_id]) < batch_per_island
561+
and next_iteration < total_iterations
562+
and not self.shutdown_event.is_set()):
563+
future = self._submit_iteration(next_iteration, island_id)
564+
if future:
565+
pending_futures[next_iteration] = future
566+
island_pending[island_id].append(next_iteration)
567+
next_iteration += 1
568+
break # Only submit one iteration per completion to maintain balance
539569

540570
# Handle shutdown
541571
if self.shutdown_event.is_set():
@@ -547,14 +577,26 @@ async def run_evolution(
547577

548578
return self.database.get_best_program()
549579

550-
def _submit_iteration(self, iteration: int) -> Optional[Future]:
551-
"""Submit an iteration to the process pool"""
580+
def _submit_iteration(self, iteration: int, island_id: Optional[int] = None) -> Optional[Future]:
581+
"""Submit an iteration to the process pool, optionally pinned to a specific island"""
552582
try:
553-
# Sample parent and inspirations
554-
parent, inspirations = self.database.sample()
583+
# Use specified island or current island
584+
target_island = island_id if island_id is not None else self.database.current_island
585+
586+
# Temporarily set database to target island for sampling
587+
original_island = self.database.current_island
588+
self.database.current_island = target_island
589+
590+
try:
591+
# Sample parent and inspirations from the target island
592+
parent, inspirations = self.database.sample()
593+
finally:
594+
# Always restore original island state
595+
self.database.current_island = original_island
555596

556597
# Create database snapshot
557598
db_snapshot = self._create_database_snapshot()
599+
db_snapshot["sampling_island"] = target_island # Mark which island this is for
558600

559601
# Submit to process pool
560602
future = self.executor.submit(

0 commit comments

Comments
 (0)