Skip to content

Commit fd3eb9f

Browse files
feat: signal handler race condition (#170)
* refactor: Fix race condition in signal handler calling sys.exit during async execu Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * fix: address review comments - restore sys.exit and add concurrent test - Revert os._exit() to sys.exit() to allow proper stack unwinding and finally block execution (prevents leaking external resources) - Update docstrings to clarify 'at-most-once' execution semantics - Add concurrent cleanup test to verify thread-safety under parallel access - The lock now correctly ensures cleanup runs only once while preserving proper Python cleanup semantics Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * fix: use RLock to prevent signal handler deadlock - Change threading.Lock to threading.RLock to prevent deadlock when signal arrives while atexit holds the lock (signal handlers run in same thread) - Remove test_docker_manager_has_cleanup_lock (tests implementation details) - Clean up dead code in concurrent test (unused tracking variables) - Update docstring to document RLock usage Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * fix: prevent signal handler from interrupting ongoing cleanup - Add _cleanup_in_progress flag to detect re-entrant calls during cleanup - Move entire cleanup operation inside the lock to prevent partial cleanup - Return status from _cleanup_resources (True=done, False=already done, None=in progress) - Signal handler returns without sys.exit() if cleanup is in progress, allowing ongoing cleanup to complete without SystemExit interruption - Add sys.stdout.flush() before os._exit(1) to ensure message is printed - Set _cleanup_done after cleanup completes (not before) for accurate state - Add test for in-progress cleanup detection Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * fix: apply race condition fixes to BinaryManager and improve tests - Apply same thread-safe cleanup pattern to BinaryManager: - Add RLock, _cleanup_in_progress, _cleanup_done flags - Update signal handler to check cleanup result before sys.exit() - Add sys.stdout.flush() before os._exit(1) - Move cleanup operations inside lock with try/finally - Fix _cleanup_on_exit in both managers to call cleanup unconditionally (it's idempotent and returns immediately if already done/in progress) - Update concurrent test to verify return value distribution: exactly one True (performed cleanup), rest False (already done) Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * refactor: use CleanupResult enum and add BinaryManager tests - Add CleanupResult enum (PERFORMED, ALREADY_DONE, IN_PROGRESS) to constants.py for self-documenting cleanup return values - Update DockerManager and BinaryManager to use CleanupResult enum - Update DockerManager tests to use CleanupResult enum - Add BinaryManager tests for: - Double cleanup prevention - Concurrent access handling with return value distribution - In-progress state detection Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * fix: restore deleted tests and add IN_PROGRESS assertion - Restore all graceful shutdown tests from master that were lost during rebase - Restore all CORS validation and configuration tests - Add IN_PROGRESS assertion to concurrent access tests to verify no re-entrancy bugs - Rename flag state tests to clarify they test flag check logic specifically Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> * refactor: extract common cleanup logic into CleanupMixin - Create CleanupMixin ABC in cleanup_mixin.py with: - _init_cleanup_state(): Initialize cleanup-related instance variables - _setup_signal_handlers(): Register SIGINT/SIGTERM handlers and atexit - _signal_handler(): Handle signals with graceful shutdown - _cleanup_on_exit(): Atexit handler - _cleanup_resources_guarded(): Thread-safe cleanup wrapper with RLock - _cleanup_resources(): Default implementation calling _do_cleanup() - remove_signal_handlers(): Restore original signal handlers - Abstract _do_cleanup(): Subclasses implement actual cleanup - Update DockerManager to inherit from CleanupMixin: - Call _init_cleanup_state() in __init__ - Implement _do_cleanup() with container graceful shutdown - Override _cleanup_resources() to add drain_timeout/stop_timeout params - Update BinaryManager to inherit from CleanupMixin: - Call _init_cleanup_state() in __init__ - Implement _do_cleanup() with process termination logic - Use default _cleanup_resources() from mixin This eliminates ~90 lines of duplicated code between the managers while maintaining identical behavior and thread-safety guarantees. Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: Sandi Fatic <chefsale@users.noreply.github.com>
1 parent 3b010d5 commit fd3eb9f

File tree

6 files changed

+455
-102
lines changed

6 files changed

+455
-102
lines changed

merobox/commands/binary_manager.py

Lines changed: 6 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Binary Manager - Manages Calimero nodes as native processes (no Docker).
33
"""
44

5-
import atexit
65
import os
76
import re
87
import shutil
@@ -17,6 +16,7 @@
1716

1817
from rich.console import Console
1918

19+
from merobox.commands.cleanup_mixin import CleanupMixin
2020
from merobox.commands.config_utils import (
2121
apply_bootstrap_nodes,
2222
apply_e2e_defaults,
@@ -32,7 +32,7 @@
3232
console = Console()
3333

3434

35-
class BinaryManager:
35+
class BinaryManager(CleanupMixin):
3636
"""Manages Calimero nodes as native binary processes."""
3737

3838
def __init__(
@@ -51,6 +51,8 @@ def __init__(
5151
shutdown on SIGINT/SIGTERM. Set to False in tests or when managing
5252
signals externally.
5353
"""
54+
self._init_cleanup_state()
55+
5456
if (
5557
binary_path
5658
and os.path.isfile(binary_path)
@@ -68,50 +70,12 @@ def __init__(
6870
self.node_rpc_ports: dict[str, int] = {}
6971
self.pid_file_dir = Path("./data/.pids")
7072
self.pid_file_dir.mkdir(parents=True, exist_ok=True)
71-
self._shutting_down = False
72-
self._original_sigint_handler = None
73-
self._original_sigterm_handler = None
7473

7574
if enable_signal_handlers:
7675
self._setup_signal_handlers()
7776

78-
def _setup_signal_handlers(self):
79-
"""Register signal handlers for graceful shutdown."""
80-
# Store original handlers so we can restore them if needed
81-
self._original_sigint_handler = signal.signal(
82-
signal.SIGINT, self._signal_handler
83-
)
84-
self._original_sigterm_handler = signal.signal(
85-
signal.SIGTERM, self._signal_handler
86-
)
87-
# Also register atexit handler for cleanup on normal exit
88-
atexit.register(self._cleanup_on_exit)
89-
90-
def _signal_handler(self, signum, frame):
91-
"""Handle SIGINT/SIGTERM signals for graceful shutdown."""
92-
if self._shutting_down:
93-
# Already shutting down, force exit on second signal
94-
console.print("\n[red]Forced exit requested, terminating...[/red]")
95-
sys.exit(1)
96-
97-
self._shutting_down = True
98-
sig_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM"
99-
console.print(
100-
f"\n[yellow]Received {sig_name}, initiating graceful shutdown...[/yellow]"
101-
)
102-
103-
self._cleanup_resources()
104-
105-
# Exit cleanly
106-
sys.exit(0)
107-
108-
def _cleanup_on_exit(self):
109-
"""Cleanup handler for atexit - only runs if not already cleaned up."""
110-
if not self._shutting_down:
111-
self._cleanup_resources()
112-
113-
def _cleanup_resources(self):
114-
"""Stop all managed processes."""
77+
def _do_cleanup(self):
78+
"""Perform the actual process cleanup."""
11579
if self.processes:
11680
console.print("[cyan]Stopping managed processes...[/cyan]")
11781
for node_name in list(self.processes.keys()):
@@ -132,15 +96,6 @@ def _cleanup_resources(self):
13296
self.processes.clear()
13397
self.node_rpc_ports.clear()
13498

135-
def remove_signal_handlers(self):
136-
"""Remove signal handlers and restore original handlers."""
137-
if self._original_sigint_handler is not None:
138-
signal.signal(signal.SIGINT, self._original_sigint_handler)
139-
self._original_sigint_handler = None
140-
if self._original_sigterm_handler is not None:
141-
signal.signal(signal.SIGTERM, self._original_sigterm_handler)
142-
self._original_sigterm_handler = None
143-
14499
def _find_binary(self, require: bool = True) -> Optional[str]:
145100
"""Find the merod binary in PATH or common locations.
146101

merobox/commands/cleanup_mixin.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""
2+
Cleanup Mixin - Thread-safe cleanup coordination for manager classes.
3+
4+
Provides a common pattern for signal handling and resource cleanup that can be
5+
shared between DockerManager and BinaryManager.
6+
"""
7+
8+
import atexit
9+
import os
10+
import signal
11+
import sys
12+
import threading
13+
from abc import ABC, abstractmethod
14+
15+
from rich.console import Console
16+
17+
from merobox.commands.constants import CleanupResult
18+
19+
console = Console()
20+
21+
22+
class CleanupMixin(ABC):
23+
"""Mixin providing thread-safe cleanup coordination for manager classes.
24+
25+
This mixin handles:
26+
- Signal handler registration (SIGINT/SIGTERM)
27+
- Thread-safe cleanup guards (RLock, _cleanup_in_progress, _cleanup_done)
28+
- Atexit handler registration
29+
- Graceful shutdown coordination
30+
31+
Subclasses must implement:
32+
- _do_cleanup(): Perform the actual resource cleanup (stop containers/processes)
33+
34+
Usage:
35+
class MyManager(CleanupMixin):
36+
def __init__(self, enable_signal_handlers=True):
37+
self._init_cleanup_state()
38+
# ... other initialization ...
39+
if enable_signal_handlers:
40+
self._setup_signal_handlers()
41+
42+
def _do_cleanup(self):
43+
# Stop your resources here
44+
pass
45+
"""
46+
47+
def _init_cleanup_state(self):
48+
"""Initialize cleanup-related instance variables.
49+
50+
Call this in __init__ before _setup_signal_handlers().
51+
"""
52+
self._shutting_down = False
53+
self._cleanup_lock = threading.RLock()
54+
self._cleanup_in_progress = False
55+
self._cleanup_done = False
56+
self._original_sigint_handler = None
57+
self._original_sigterm_handler = None
58+
59+
def _setup_signal_handlers(self):
60+
"""Register signal handlers for graceful shutdown."""
61+
self._original_sigint_handler = signal.signal(
62+
signal.SIGINT, self._signal_handler
63+
)
64+
self._original_sigterm_handler = signal.signal(
65+
signal.SIGTERM, self._signal_handler
66+
)
67+
atexit.register(self._cleanup_on_exit)
68+
69+
def _signal_handler(self, signum, frame):
70+
"""Handle SIGINT/SIGTERM signals for graceful shutdown.
71+
72+
Uses sys.exit() to allow proper stack unwinding and finally block
73+
execution. If cleanup is already in progress (e.g., via atexit), we
74+
return without calling sys.exit() to avoid interrupting the ongoing
75+
cleanup with SystemExit.
76+
"""
77+
if self._shutting_down:
78+
console.print("\n[red]Forced exit requested, terminating...[/red]")
79+
sys.stdout.flush()
80+
os._exit(1)
81+
82+
self._shutting_down = True
83+
sig_name = "SIGINT" if signum == signal.SIGINT else "SIGTERM"
84+
console.print(
85+
f"\n[yellow]Received {sig_name}, initiating graceful shutdown...[/yellow]"
86+
)
87+
88+
cleanup_result = self._cleanup_resources()
89+
90+
if cleanup_result != CleanupResult.IN_PROGRESS:
91+
sys.exit(0)
92+
93+
def _cleanup_on_exit(self):
94+
"""Cleanup handler for atexit.
95+
96+
Calls _cleanup_resources unconditionally since it's idempotent and
97+
will return immediately if cleanup was already done or is in progress.
98+
"""
99+
self._cleanup_resources()
100+
101+
def _cleanup_resources_guarded(self, cleanup_fn, *args, **kwargs) -> CleanupResult:
102+
"""Execute cleanup function with thread-safe guards.
103+
104+
Thread-safe guard ensuring at-most-once execution semantics. Uses RLock
105+
to allow re-entrant calls from signal handlers in the same thread.
106+
107+
Args:
108+
cleanup_fn: The cleanup function to execute (typically self._do_cleanup)
109+
*args, **kwargs: Arguments to pass to cleanup_fn
110+
111+
Returns:
112+
CleanupResult.PERFORMED: Cleanup was executed by this call
113+
CleanupResult.ALREADY_DONE: Cleanup was already completed previously
114+
CleanupResult.IN_PROGRESS: Cleanup is currently in progress (re-entrant call)
115+
"""
116+
with self._cleanup_lock:
117+
if self._cleanup_done:
118+
return CleanupResult.ALREADY_DONE
119+
if self._cleanup_in_progress:
120+
return CleanupResult.IN_PROGRESS
121+
self._cleanup_in_progress = True
122+
123+
try:
124+
cleanup_fn(*args, **kwargs)
125+
finally:
126+
self._cleanup_done = True
127+
self._cleanup_in_progress = False
128+
129+
return CleanupResult.PERFORMED
130+
131+
def _cleanup_resources(self) -> CleanupResult:
132+
"""Stop all managed resources with thread-safe guards.
133+
134+
Default implementation that calls _do_cleanup() with no arguments.
135+
Subclasses can override this to add parameters (like drain_timeout).
136+
137+
Returns:
138+
CleanupResult.PERFORMED: Cleanup was executed by this call
139+
CleanupResult.ALREADY_DONE: Cleanup was already completed previously
140+
CleanupResult.IN_PROGRESS: Cleanup is currently in progress (re-entrant call)
141+
"""
142+
return self._cleanup_resources_guarded(self._do_cleanup)
143+
144+
@abstractmethod
145+
def _do_cleanup(self, *args, **kwargs):
146+
"""Perform the actual resource cleanup.
147+
148+
Implement this method to stop containers, processes, or other resources.
149+
This method is called inside the cleanup lock, so it's guaranteed to run
150+
at most once.
151+
152+
Note: Do not call sys.exit() or raise SystemExit from this method.
153+
"""
154+
pass
155+
156+
def remove_signal_handlers(self):
157+
"""Remove signal handlers and restore original handlers."""
158+
if self._original_sigint_handler is not None:
159+
signal.signal(signal.SIGINT, self._original_sigint_handler)
160+
self._original_sigint_handler = None
161+
if self._original_sigterm_handler is not None:
162+
signal.signal(signal.SIGTERM, self._original_sigterm_handler)
163+
self._original_sigterm_handler = None

merobox/commands/constants.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,23 @@
22
Constants and configuration values used across the merobox codebase.
33
"""
44

5+
from enum import Enum, auto
6+
7+
8+
class CleanupResult(Enum):
9+
"""Result of cleanup operation for thread-safe cleanup methods.
10+
11+
Used by DockerManager and BinaryManager to communicate cleanup status:
12+
- PERFORMED: Cleanup was executed by this call
13+
- ALREADY_DONE: Cleanup was already completed by a previous call
14+
- IN_PROGRESS: Cleanup is currently in progress (re-entrant call)
15+
"""
16+
17+
PERFORMED = auto()
18+
ALREADY_DONE = auto()
19+
IN_PROGRESS = auto()
20+
21+
522
# API Endpoints
623
JSONRPC_ENDPOINT = "/jsonrpc"
724
ADMIN_API_BASE = "/admin-api"

0 commit comments

Comments
 (0)