Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions python/cocoindex/subprocess_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import pickle
import threading
import asyncio
import os
import time
from .user_app_loader import load_user_app

WATCHDOG_INTERVAL_SECONDS = 10.0

# ---------------------------------------------
# Main process: single, lazily-created pool
Expand All @@ -33,7 +36,9 @@ def _get_pool() -> ProcessPoolExecutor:
if _pool is None:
# Single worker process as requested
_pool = ProcessPoolExecutor(
max_workers=1, initializer=_subprocess_init, initargs=(_user_apps,)
max_workers=1,
initializer=_subprocess_init,
initargs=(_user_apps, os.getpid()),
)
return _pool

Expand All @@ -48,7 +53,33 @@ def add_user_app(app_target: str) -> None:
# ---------------------------------------------


def _subprocess_init(user_apps: list[str]) -> None:
def _start_parent_watchdog(
parent_pid: int, interval_seconds: float = WATCHDOG_INTERVAL_SECONDS
) -> None:
"""Terminate this process if the parent process exits or PPID changes.

This runs in a background daemon thread so it never blocks pool work.
"""

def _watch() -> None:
while True:
# If PPID changed (parent died and we were reparented), exit.
if os.getppid() != parent_pid:
os._exit(1)

# Best-effort liveness probe in case PPID was reused.
try:
os.kill(parent_pid, 0)
except OSError:
os._exit(1)

time.sleep(interval_seconds)

threading.Thread(target=_watch, name="parent-watchdog", daemon=True).start()


def _subprocess_init(user_apps: list[str], parent_pid: int) -> None:
_start_parent_watchdog(parent_pid)
for app_target in user_apps:
load_user_app(app_target)

Expand Down
Loading