diff --git a/python/cocoindex/subprocess_exec.py b/python/cocoindex/subprocess_exec.py index cfb36ab9b..ff8553d50 100644 --- a/python/cocoindex/subprocess_exec.py +++ b/python/cocoindex/subprocess_exec.py @@ -11,6 +11,7 @@ from __future__ import annotations from concurrent.futures import ProcessPoolExecutor +from concurrent.futures.process import BrokenProcessPool from dataclasses import dataclass, field from typing import Any, Callable import pickle @@ -19,6 +20,8 @@ import os import time from .user_app_loader import load_user_app +from .runtime import execution_context +import logging WATCHDOG_INTERVAL_SECONDS = 10.0 @@ -28,6 +31,7 @@ _pool_lock = threading.Lock() _pool: ProcessPoolExecutor | None = None _user_apps: list[str] = [] +_logger = logging.getLogger(__name__) def _get_pool() -> ProcessPoolExecutor: @@ -48,6 +52,46 @@ def add_user_app(app_target: str) -> None: _user_apps.append(app_target) +def _restart_pool(old_pool: ProcessPoolExecutor | None = None) -> None: + """Safely restart the global ProcessPoolExecutor. + + Thread-safe via `_pool_lock`. Shuts down the old pool and re-creates a new + one with the same initializer/args. + """ + global _pool + with _pool_lock: + # If another thread already swapped the pool, skip restart + if old_pool is not None and _pool is not old_pool: + return + _logger.error("Detected dead subprocess pool; restarting and retrying.") + prev_pool = _pool + _pool = ProcessPoolExecutor( + max_workers=1, + initializer=_subprocess_init, + initargs=(_user_apps, os.getpid()), + ) + if prev_pool is not None: + # Best-effort shutdown of previous pool; letting exceptions bubble up + # is acceptable here and signals irrecoverable executor state. + prev_pool.shutdown(cancel_futures=True) + + +async def _submit_with_restart(fn: Callable[..., Any], *args: Any) -> Any: + """Submit and await work, restarting the subprocess until it succeeds. + + Retries on BrokenProcessPool or pool-shutdown RuntimeError; re-raises other + exceptions. + """ + while True: + pool = _get_pool() + try: + fut = pool.submit(fn, *args) + return await asyncio.wrap_future(fut) + except BrokenProcessPool: + _restart_pool(old_pool=pool) + # loop and retry + + # --------------------------------------------- # Subprocess: executor registry and helpers # --------------------------------------------- @@ -164,27 +208,26 @@ def __init__(self, executor_factory: type[Any], spec: Any) -> None: (executor_factory, spec), protocol=pickle.HIGHEST_PROTOCOL ) - # Conditionally expose analyze if underlying class has it (sync-only in caller) + # Conditionally expose analyze if underlying class has it if hasattr(executor_factory, "analyze"): # Bind as attribute so getattr(..., "analyze", None) works upstream - def _analyze() -> Any: - fut = self._pool.submit(_sp_analyze, self._key_bytes) - return fut.result() + def analyze() -> Any: + return execution_context.run( + _submit_with_restart(_sp_analyze, self._key_bytes) + ) # Attach method - setattr(self, "analyze", _analyze) + setattr(self, "analyze", analyze) if hasattr(executor_factory, "prepare"): async def prepare() -> Any: - fut = self._pool.submit(_sp_prepare, self._key_bytes) - return await asyncio.wrap_future(fut) + return await _submit_with_restart(_sp_prepare, self._key_bytes) setattr(self, "prepare", prepare) async def __call__(self, *args: Any, **kwargs: Any) -> Any: - fut = self._pool.submit(_sp_call, self._key_bytes, args, kwargs) - return await asyncio.wrap_future(fut) + return await _submit_with_restart(_sp_call, self._key_bytes, args, kwargs) def executor_stub(executor_factory: type[Any], spec: Any) -> Any: