diff --git a/python/cocoindex/subprocess_exec.py b/python/cocoindex/subprocess_exec.py index ff8553d50..a64268aa0 100644 --- a/python/cocoindex/subprocess_exec.py +++ b/python/cocoindex/subprocess_exec.py @@ -22,6 +22,7 @@ from .user_app_loader import load_user_app from .runtime import execution_context import logging +import multiprocessing as mp WATCHDOG_INTERVAL_SECONDS = 10.0 @@ -43,6 +44,7 @@ def _get_pool() -> ProcessPoolExecutor: max_workers=1, initializer=_subprocess_init, initargs=(_user_apps, os.getpid()), + mp_context=mp.get_context("spawn"), ) return _pool @@ -69,6 +71,7 @@ def _restart_pool(old_pool: ProcessPoolExecutor | None = None) -> None: max_workers=1, initializer=_subprocess_init, initargs=(_user_apps, os.getpid()), + mp_context=mp.get_context("spawn"), ) if prev_pool is not None: # Best-effort shutdown of previous pool; letting exceptions bubble up @@ -124,8 +127,19 @@ def _watch() -> None: def _subprocess_init(user_apps: list[str], parent_pid: int) -> None: _start_parent_watchdog(parent_pid) + + # In case any user app is already in this subprocess, e.g. the subprocess is forked, we need to avoid loading it again. + with _pool_lock: + already_loaded_apps = set(_user_apps) + + loaded_apps = [] for app_target in user_apps: - load_user_app(app_target) + if app_target not in already_loaded_apps: + load_user_app(app_target) + loaded_apps.append(app_target) + + with _pool_lock: + _user_apps.extend(loaded_apps) class _OnceResult: