Skip to content

Commit fdb803e

Browse files
committed
feat: implement safe 'spawn' context and signal handling for subprocess management
1 parent 4c18008 commit fdb803e

File tree

1 file changed

+81
-15
lines changed

1 file changed

+81
-15
lines changed

src/asynctasq/tasks/infrastructure/process_pool_manager.py

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@
1111
- Context manager support for RAII pattern
1212
- Configurable worker limits and task recycling
1313
- Process-local state management for multiprocessing safety
14+
- Secure 'spawn' context by default for cross-platform safety
15+
- Graceful signal handling in subprocesses
16+
17+
Best Practices Applied (2025):
18+
- Uses 'spawn' start method for safer multiprocessing (avoids fork corruption)
19+
- Implements SIGINT handlers for clean shutdown without tracebacks
20+
- Uses max_tasks_per_child to prevent memory leaks
21+
- Provides context manager for proper resource cleanup
22+
- Thread-safe operations with proper locking primitives
1423
"""
1524

1625
from __future__ import annotations
@@ -20,8 +29,11 @@
2029
from concurrent.futures import ProcessPoolExecutor
2130
from dataclasses import dataclass, field
2231
import logging
32+
import multiprocessing
2333
import multiprocessing.context
2434
import os
35+
import signal
36+
import sys
2537
import threading
2638
import types
2739
from typing import Any, Final, Literal, Self, TypedDict
@@ -49,31 +61,65 @@ class PoolStats(TypedDict):
4961
_fallback_count = 0
5062
_fallback_lock = threading.Lock()
5163

52-
# Default max_tasks_per_child to prevent memory leaks (best practice from research)
64+
# Default max_tasks_per_child to prevent memory leaks (Python 3.11+ feature)
65+
# Research shows 100-1000 is optimal for most workloads to balance overhead vs memory safety
5366
DEFAULT_MAX_TASKS_PER_CHILD: Final = 100
5467

5568

69+
def _get_safe_mp_context() -> multiprocessing.context.BaseContext:
70+
"""Get the safest multiprocessing context for the current platform.
71+
72+
Returns 'spawn' context which is:
73+
- Cross-platform compatible (works on Windows, macOS, Linux)
74+
- Safer than 'fork' (avoids inheriting locks/state from parent)
75+
- Required for CUDA/GPU workloads
76+
- Default in Python 3.14+ on all platforms
77+
78+
Note:
79+
While 'spawn' is slower than 'fork' due to fresh interpreter startup,
80+
it prevents deadlocks, corruption, and crashes that can occur with fork.
81+
82+
Returns:
83+
Multiprocessing context configured for spawn start method
84+
"""
85+
return multiprocessing.get_context("spawn")
86+
87+
5688
def _setup_subprocess_io() -> None:
57-
"""Configure subprocess stdout/stderr to be unbuffered.
89+
"""Configure subprocess environment for safe execution.
5890
59-
Ensures print() statements and logging output from subprocesses
60-
appear immediately in the parent process terminal. This is especially
61-
important on macOS/Windows where 'spawn' context creates fresh processes.
91+
Sets up:
92+
1. Unbuffered I/O for immediate output visibility
93+
2. SIGINT handler for graceful shutdown without tracebacks
94+
3. SIGTERM handler for clean termination
95+
96+
This is especially important on macOS/Windows where 'spawn' context
97+
creates fresh processes that need proper signal handling.
6298
6399
Called by ProcessPoolExecutor as the worker initializer function.
100+
101+
Best Practice:
102+
Child processes should not handle SIGINT/SIGTERM directly - the parent
103+
process controls shutdown sequencing. This prevents race conditions and
104+
ensures orderly resource cleanup.
64105
"""
65-
import signal
66-
import sys
67106

68107
# Suppress KeyboardInterrupt traceback in subprocess workers
69-
# This prevents noisy output when the parent process shuts down
70-
def silent_interrupt_handler(signum: int, frame: Any) -> None:
71-
"""Silently exit on SIGINT without printing traceback."""
108+
# Prevents noisy output when parent process shuts down pool
109+
def _silent_signal_handler(signum: int, frame: Any) -> None:
110+
"""Silently exit on signal without printing traceback.
111+
112+
This gives the parent process full control over shutdown sequencing
113+
while avoiding confusing subprocess tracebacks in the terminal.
114+
"""
72115
sys.exit(0)
73116

74-
signal.signal(signal.SIGINT, silent_interrupt_handler)
117+
# Install handlers for common termination signals
118+
signal.signal(signal.SIGINT, _silent_signal_handler)
119+
signal.signal(signal.SIGTERM, _silent_signal_handler)
75120

76121
# Force unbuffered output for immediate visibility
122+
# Critical for debugging and monitoring subprocess behavior
77123
if hasattr(sys.stdout, "reconfigure"):
78124
sys.stdout.reconfigure(line_buffering=True) # type: ignore[attr-defined]
79125
if hasattr(sys.stderr, "reconfigure"):
@@ -196,9 +242,13 @@ class ProcessPoolManager:
196242
"""Instance-based manager for sync and async process pools with context manager support.
197243
198244
Provides thread-safe process pool management with automatic cleanup.
199-
Use as async context manager for automatic resource management:
245+
Implements 2025 best practices for safe multiprocessing:
246+
- Uses 'spawn' context by default (safer than 'fork')
247+
- Graceful signal handling in subprocesses
248+
- Memory leak prevention via max_tasks_per_child
249+
- Proper resource cleanup via context manager
200250
201-
Example:
251+
Example (Recommended):
202252
```python
203253
async with ProcessPoolManager() as manager:
204254
pool = manager.get_sync_pool()
@@ -214,6 +264,13 @@ class ProcessPoolManager:
214264
# ... use pools ...
215265
await manager.shutdown()
216266
```
267+
268+
Attributes:
269+
sync_max_workers: Max workers for sync pool (default: CPU count)
270+
async_max_workers: Max workers for async pool (default: CPU count)
271+
sync_max_tasks_per_child: Tasks before worker restart (default: 100)
272+
async_max_tasks_per_child: Tasks before worker restart (default: 100)
273+
mp_context: Multiprocessing context (default: spawn for safety)
217274
"""
218275

219276
# Configuration parameters
@@ -344,7 +401,7 @@ def _create_pool(
344401
pool_type: "sync" or "async"
345402
max_workers: Max workers (None = CPU count)
346403
max_tasks_per_child: Tasks per worker before restart
347-
mp_context: Multiprocessing context
404+
mp_context: Multiprocessing context (defaults to safe 'spawn')
348405
initializer: Callable to run on worker startup
349406
initargs: Arguments for initializer
350407
@@ -354,10 +411,18 @@ def _create_pool(
354411
Raises:
355412
ValueError: If max_workers is <= 0
356413
TypeError: If max_workers is not an integer
414+
415+
Best Practice:
416+
Uses 'spawn' context by default for safety. While slower than 'fork',
417+
it prevents deadlocks from inherited locks and corruption from shared state.
357418
"""
358419
# Determine actual max_workers (None defaults to CPU count)
359420
actual_max_workers = max_workers if max_workers is not None else self._get_cpu_count()
360421

422+
# Use safe 'spawn' context by default
423+
# This prevents fork-related issues: deadlocks, corruption, crashes
424+
actual_mp_context = mp_context if mp_context is not None else _get_safe_mp_context()
425+
361426
# Validation happens in ProcessPoolExecutor constructor
362427
# It will raise ValueError if max_workers <= 0 or TypeError if not int
363428

@@ -367,13 +432,14 @@ def _create_pool(
367432
"pool_size": actual_max_workers,
368433
"max_tasks_per_child": max_tasks_per_child,
369434
"pool_type": pool_type,
435+
"mp_context": actual_mp_context._name, # type: ignore[attr-defined]
370436
},
371437
)
372438

373439
return ProcessPoolExecutor(
374440
max_workers=actual_max_workers,
375441
max_tasks_per_child=max_tasks_per_child,
376-
mp_context=mp_context,
442+
mp_context=actual_mp_context,
377443
initializer=initializer,
378444
initargs=initargs,
379445
)

0 commit comments

Comments
 (0)