Skip to content

Commit 3408377

Browse files
authored
feat(subprocess): automatically retry on subprocess terminates (#901)
1 parent 11407bb commit 3408377

File tree

1 file changed

+52
-9
lines changed

1 file changed

+52
-9
lines changed

python/cocoindex/subprocess_exec.py

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from __future__ import annotations
1212

1313
from concurrent.futures import ProcessPoolExecutor
14+
from concurrent.futures.process import BrokenProcessPool
1415
from dataclasses import dataclass, field
1516
from typing import Any, Callable
1617
import pickle
@@ -19,6 +20,8 @@
1920
import os
2021
import time
2122
from .user_app_loader import load_user_app
23+
from .runtime import execution_context
24+
import logging
2225

2326
WATCHDOG_INTERVAL_SECONDS = 10.0
2427

@@ -28,6 +31,7 @@
2831
_pool_lock = threading.Lock()
2932
_pool: ProcessPoolExecutor | None = None
3033
_user_apps: list[str] = []
34+
_logger = logging.getLogger(__name__)
3135

3236

3337
def _get_pool() -> ProcessPoolExecutor:
@@ -48,6 +52,46 @@ def add_user_app(app_target: str) -> None:
4852
_user_apps.append(app_target)
4953

5054

55+
def _restart_pool(old_pool: ProcessPoolExecutor | None = None) -> None:
56+
"""Safely restart the global ProcessPoolExecutor.
57+
58+
Thread-safe via `_pool_lock`. Shuts down the old pool and re-creates a new
59+
one with the same initializer/args.
60+
"""
61+
global _pool
62+
with _pool_lock:
63+
# If another thread already swapped the pool, skip restart
64+
if old_pool is not None and _pool is not old_pool:
65+
return
66+
_logger.error("Detected dead subprocess pool; restarting and retrying.")
67+
prev_pool = _pool
68+
_pool = ProcessPoolExecutor(
69+
max_workers=1,
70+
initializer=_subprocess_init,
71+
initargs=(_user_apps, os.getpid()),
72+
)
73+
if prev_pool is not None:
74+
# Best-effort shutdown of previous pool; letting exceptions bubble up
75+
# is acceptable here and signals irrecoverable executor state.
76+
prev_pool.shutdown(cancel_futures=True)
77+
78+
79+
async def _submit_with_restart(fn: Callable[..., Any], *args: Any) -> Any:
80+
"""Submit and await work, restarting the subprocess until it succeeds.
81+
82+
Retries on BrokenProcessPool or pool-shutdown RuntimeError; re-raises other
83+
exceptions.
84+
"""
85+
while True:
86+
pool = _get_pool()
87+
try:
88+
fut = pool.submit(fn, *args)
89+
return await asyncio.wrap_future(fut)
90+
except BrokenProcessPool:
91+
_restart_pool(old_pool=pool)
92+
# loop and retry
93+
94+
5195
# ---------------------------------------------
5296
# Subprocess: executor registry and helpers
5397
# ---------------------------------------------
@@ -164,27 +208,26 @@ def __init__(self, executor_factory: type[Any], spec: Any) -> None:
164208
(executor_factory, spec), protocol=pickle.HIGHEST_PROTOCOL
165209
)
166210

167-
# Conditionally expose analyze if underlying class has it (sync-only in caller)
211+
# Conditionally expose analyze if underlying class has it
168212
if hasattr(executor_factory, "analyze"):
169213
# Bind as attribute so getattr(..., "analyze", None) works upstream
170-
def _analyze() -> Any:
171-
fut = self._pool.submit(_sp_analyze, self._key_bytes)
172-
return fut.result()
214+
def analyze() -> Any:
215+
return execution_context.run(
216+
_submit_with_restart(_sp_analyze, self._key_bytes)
217+
)
173218

174219
# Attach method
175-
setattr(self, "analyze", _analyze)
220+
setattr(self, "analyze", analyze)
176221

177222
if hasattr(executor_factory, "prepare"):
178223

179224
async def prepare() -> Any:
180-
fut = self._pool.submit(_sp_prepare, self._key_bytes)
181-
return await asyncio.wrap_future(fut)
225+
return await _submit_with_restart(_sp_prepare, self._key_bytes)
182226

183227
setattr(self, "prepare", prepare)
184228

185229
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
186-
fut = self._pool.submit(_sp_call, self._key_bytes, args, kwargs)
187-
return await asyncio.wrap_future(fut)
230+
return await _submit_with_restart(_sp_call, self._key_bytes, args, kwargs)
188231

189232

190233
def executor_stub(executor_factory: type[Any], spec: Any) -> Any:

0 commit comments

Comments
 (0)