3
3
import asyncio
4
4
import logging
5
5
import os
6
- from typing import IO , Any , Dict , List , Optional , Tuple , Union
6
+ import signal
7
+ from typing import IO , Any , Dict , List , Optional , Set , Tuple , Union
8
+ from weakref import WeakSet
7
9
8
10
9
11
class ProcessManager :
10
12
"""Manages process creation, execution, and cleanup for shell commands."""
11
13
14
+ def __init__ (self ):
15
+ """Initialize ProcessManager with signal handling setup."""
16
+ self ._processes : Set [asyncio .subprocess .Process ] = WeakSet ()
17
+ self ._original_sigint_handler = None
18
+ self ._original_sigterm_handler = None
19
+ self ._setup_signal_handlers ()
20
+
21
+ def _setup_signal_handlers (self ) -> None :
22
+ """Set up signal handlers for graceful process management."""
23
+ if os .name != "posix" :
24
+ return
25
+
26
+ def handle_termination (signum : int , _ : Any ) -> None :
27
+ """Handle termination signals by cleaning up processes."""
28
+ if self ._processes :
29
+ for process in self ._processes :
30
+ try :
31
+ if process .returncode is None :
32
+ process .terminate ()
33
+ except Exception as e :
34
+ logging .warning (
35
+ f"Error terminating process on signal { signum } : { e } "
36
+ )
37
+
38
+ # Restore original handler and re-raise signal
39
+ if signum == signal .SIGINT and self ._original_sigint_handler :
40
+ signal .signal (signal .SIGINT , self ._original_sigint_handler )
41
+ elif signum == signal .SIGTERM and self ._original_sigterm_handler :
42
+ signal .signal (signal .SIGTERM , self ._original_sigterm_handler )
43
+
44
+ # Re-raise signal
45
+ os .kill (os .getpid (), signum )
46
+
47
+ # Store original handlers
48
+ self ._original_sigint_handler = signal .signal (signal .SIGINT , handle_termination )
49
+ self ._original_sigterm_handler = signal .signal (
50
+ signal .SIGTERM , handle_termination
51
+ )
52
+
53
+ async def start_process_async (
54
+ self , cmd : List [str ], timeout : Optional [int ] = None
55
+ ) -> asyncio .subprocess .Process :
56
+ """Start a new process asynchronously.
57
+
58
+ Args:
59
+ cmd: Command to execute as list of strings
60
+ timeout: Optional timeout in seconds
61
+
62
+ Returns:
63
+ Process object
64
+ """
65
+ process = await self .create_process (
66
+ " " .join (cmd ), directory = None , timeout = timeout
67
+ )
68
+ return process
69
+
70
+ def start_process (
71
+ self , cmd : List [str ], timeout : Optional [int ] = None
72
+ ) -> asyncio .subprocess .Process :
73
+ """Start a new process synchronously.
74
+
75
+ Args:
76
+ cmd: Command to execute as list of strings
77
+ timeout: Optional timeout in seconds
78
+
79
+ Returns:
80
+ Process object
81
+ """
82
+ process = asyncio .get_event_loop ().run_until_complete (
83
+ self .start_process_async (cmd , timeout )
84
+ )
85
+ process .is_running = lambda self = process : self .returncode is None # type: ignore
86
+ return process
87
+
88
+ async def cleanup_processes (
89
+ self , processes : List [asyncio .subprocess .Process ]
90
+ ) -> None :
91
+ """Clean up processes by killing them if they're still running.
92
+
93
+ Args:
94
+ processes: List of processes to clean up
95
+ """
96
+ cleanup_tasks = []
97
+ for process in processes :
98
+ if process .returncode is None :
99
+ try :
100
+ # Force kill immediately as required by tests
101
+ process .kill ()
102
+ cleanup_tasks .append (asyncio .create_task (process .wait ()))
103
+ except Exception as e :
104
+ logging .warning (f"Error killing process: { e } " )
105
+
106
+ if cleanup_tasks :
107
+ try :
108
+ # Wait for all processes to be killed
109
+ await asyncio .wait (cleanup_tasks , timeout = 5 )
110
+ except asyncio .TimeoutError :
111
+ logging .error ("Process cleanup timed out" )
112
+ except Exception as e :
113
+ logging .error (f"Error during process cleanup: { e } " )
114
+
115
+ async def cleanup_all (self ) -> None :
116
+ """Clean up all tracked processes."""
117
+ if self ._processes :
118
+ processes = list (self ._processes )
119
+ await self .cleanup_processes (processes )
120
+ self ._processes .clear ()
121
+
12
122
async def create_process (
13
123
self ,
14
124
shell_cmd : str ,
15
125
directory : Optional [str ],
16
126
stdin : Optional [str ] = None ,
17
127
stdout_handle : Any = asyncio .subprocess .PIPE ,
18
128
envs : Optional [Dict [str , str ]] = None ,
129
+ timeout : Optional [int ] = None ,
19
130
) -> asyncio .subprocess .Process :
20
131
"""Create a new subprocess with the given parameters.
21
132
@@ -25,23 +136,34 @@ async def create_process(
25
136
stdin (Optional[str]): Input to be passed to the process
26
137
stdout_handle: File handle or PIPE for stdout
27
138
envs (Optional[Dict[str, str]]): Additional environment variables
139
+ timeout (Optional[int]): Timeout in seconds
28
140
29
141
Returns:
30
142
asyncio.subprocess.Process: Created process
143
+
144
+ Raises:
145
+ ValueError: If process creation fails
31
146
"""
32
147
try :
33
- return await asyncio .create_subprocess_shell (
148
+ process = await asyncio .create_subprocess_shell (
34
149
shell_cmd ,
35
150
stdin = asyncio .subprocess .PIPE ,
36
151
stdout = stdout_handle ,
37
152
stderr = asyncio .subprocess .PIPE ,
38
153
env = {** os .environ , ** (envs or {})},
39
154
cwd = directory ,
40
155
)
156
+
157
+ # Add process to tracked set
158
+ self ._processes .add (process )
159
+ return process
160
+
41
161
except OSError as e :
42
162
raise ValueError (f"Failed to create process: { str (e )} " ) from e
43
163
except Exception as e :
44
- raise ValueError (f"Unexpected error: { str (e )} " ) from e
164
+ raise ValueError (
165
+ f"Unexpected error during process creation: { str (e )} "
166
+ ) from e
45
167
46
168
async def execute_with_timeout (
47
169
self ,
@@ -126,6 +248,8 @@ async def execute_pipeline(
126
248
),
127
249
envs = envs ,
128
250
)
251
+ if not hasattr (process , "is_running" ):
252
+ process .is_running = lambda self = process : self .returncode is None # type: ignore
129
253
processes .append (process )
130
254
131
255
try :
@@ -171,29 +295,3 @@ async def execute_pipeline(
171
295
172
296
finally :
173
297
await self .cleanup_processes (processes )
174
-
175
- async def cleanup_processes (
176
- self , processes : List [asyncio .subprocess .Process ]
177
- ) -> None :
178
- """Clean up processes by killing them if they're still running.
179
-
180
- Args:
181
- processes: List of processes to clean up
182
- """
183
- cleanup_tasks = []
184
- for process in processes :
185
- if process .returncode is None :
186
- try :
187
- process .kill ()
188
- cleanup_tasks .append (asyncio .create_task (process .wait ()))
189
- except Exception as e :
190
- logging .warning (f"Error cleaning up process: { e } " )
191
-
192
- if cleanup_tasks :
193
- try :
194
- # Set a timeout for cleanup to prevent hanging
195
- await asyncio .wait_for (asyncio .gather (* cleanup_tasks ), timeout = 5 )
196
- except asyncio .TimeoutError :
197
- logging .warning ("Process cleanup timed out" )
198
- except Exception as e :
199
- logging .warning (f"Error during process cleanup: { e } " )
0 commit comments