Skip to content

Commit aa24f2c

Browse files
authored
chore: shutdown ProcessPoolExecutor, make the test work for Windows (#951)
1 parent d2a8101 commit aa24f2c

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

python/cocoindex/subprocess_exec.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import asyncio
2020
import os
2121
import time
22+
import atexit
2223
from .user_app_loader import load_user_app
2324
from .runtime import execution_context
2425
import logging
@@ -31,14 +32,39 @@
3132
# ---------------------------------------------
3233
_pool_lock = threading.Lock()
3334
_pool: ProcessPoolExecutor | None = None
35+
_pool_cleanup_registered = False
3436
_user_apps: list[str] = []
3537
_logger = logging.getLogger(__name__)
3638

3739

40+
def shutdown_pool_at_exit() -> None:
41+
"""Best-effort shutdown of the global ProcessPoolExecutor on interpreter exit."""
42+
global _pool, _pool_cleanup_registered # pylint: disable=global-statement
43+
with _pool_lock:
44+
if _pool is not None:
45+
try:
46+
_pool.shutdown(wait=True, cancel_futures=True)
47+
except Exception as e:
48+
_logger.error(
49+
"Error during ProcessPoolExecutor shutdown at exit: %s",
50+
e,
51+
exc_info=True,
52+
)
53+
finally:
54+
_pool = None
55+
_pool_cleanup_registered = False
56+
57+
3858
def _get_pool() -> ProcessPoolExecutor:
39-
global _pool
59+
global _pool, _pool_cleanup_registered # pylint: disable=global-statement
4060
with _pool_lock:
4161
if _pool is None:
62+
if not _pool_cleanup_registered:
63+
# Register the shutdown at exit at creation time (rather than at import time)
64+
# to make sure it's executed earlier in the shutdown sequence.
65+
atexit.register(shutdown_pool_at_exit)
66+
_pool_cleanup_registered = True
67+
4268
# Single worker process as requested
4369
_pool = ProcessPoolExecutor(
4470
max_workers=1,
@@ -213,11 +239,9 @@ def _sp_call(key_bytes: bytes, args: tuple[Any, ...], kwargs: dict[str, Any]) ->
213239

214240

215241
class _ExecutorStub:
216-
_pool: ProcessPoolExecutor
217242
_key_bytes: bytes
218243

219244
def __init__(self, executor_factory: type[Any], spec: Any) -> None:
220-
self._pool = _get_pool()
221245
self._key_bytes = pickle.dumps(
222246
(executor_factory, spec), protocol=pickle.HIGHEST_PROTOCOL
223247
)

python/cocoindex/tests/conftest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import pytest
2+
import typing
3+
import os
4+
import signal
5+
import sys
6+
7+
8+
@pytest.fixture(scope="session", autouse=True)
9+
def _cocoindex_windows_env_fixture(
10+
request: pytest.FixtureRequest,
11+
) -> typing.Generator[None, None, None]:
12+
"""Shutdown the subprocess pool at exit on Windows."""
13+
14+
yield
15+
16+
if not sys.platform.startswith("win"):
17+
return
18+
19+
try:
20+
import cocoindex.subprocess_exec
21+
22+
original_sigint_handler = signal.getsignal(signal.SIGINT)
23+
try:
24+
signal.signal(signal.SIGINT, signal.SIG_IGN)
25+
cocoindex.subprocess_exec.shutdown_pool_at_exit()
26+
27+
# If any test failed, let pytest exit normally with nonzero code
28+
if request.session.testsfailed == 0:
29+
os._exit(0) # immediate success exit (skips atexit/teardown)
30+
31+
finally:
32+
try:
33+
signal.signal(signal.SIGINT, original_sigint_handler)
34+
except ValueError: # noqa: BLE001
35+
pass
36+
37+
except (ImportError, AttributeError): # noqa: BLE001
38+
pass

0 commit comments

Comments
 (0)