3
3
import asyncio
4
4
import logging
5
5
import os
6
- from typing import IO , Any , Dict , List , Optional , Tuple , Union
6
+ import signal
7
+ from typing import IO , Any , Dict , List , Optional , Set , Tuple , Union
8
+ from weakref import WeakSet
7
9
8
10
9
11
class ProcessManager :
10
12
"""Manages process creation, execution, and cleanup for shell commands."""
11
13
14
+ def __init__ (self ):
15
+ """Initialize ProcessManager with signal handling setup."""
16
+ self ._processes : Set [asyncio .subprocess .Process ] = WeakSet ()
17
+ self ._original_sigint_handler = None
18
+ self ._original_sigterm_handler = None
19
+ self ._setup_signal_handlers ()
20
+
21
+ def _setup_signal_handlers (self ) -> None :
22
+ """Set up signal handlers for graceful process management."""
23
+ if os .name != "posix" :
24
+ return
25
+
26
+ def handle_termination (signum : int , _ : Any ) -> None :
27
+ """Handle termination signals by cleaning up processes."""
28
+ if self ._processes :
29
+ for process in self ._processes :
30
+ try :
31
+ if process .returncode is None :
32
+ process .terminate ()
33
+ except Exception as e :
34
+ logging .warning (
35
+ f"Error terminating process on signal { signum } : { e } "
36
+ )
37
+
38
+ # Restore original handler and re-raise signal
39
+ if signum == signal .SIGINT and self ._original_sigint_handler :
40
+ signal .signal (signal .SIGINT , self ._original_sigint_handler )
41
+ elif signum == signal .SIGTERM and self ._original_sigterm_handler :
42
+ signal .signal (signal .SIGTERM , self ._original_sigterm_handler )
43
+
44
+ # Re-raise signal
45
+ os .kill (os .getpid (), signum )
46
+
47
+ # Store original handlers
48
+ self ._original_sigint_handler = signal .signal (signal .SIGINT , handle_termination )
49
+ self ._original_sigterm_handler = signal .signal (
50
+ signal .SIGTERM , handle_termination
51
+ )
52
+
53
+ async def start_process_async (
54
+ self , cmd : List [str ], timeout : Optional [int ] = None
55
+ ) -> asyncio .subprocess .Process :
56
+ """Start a new process asynchronously.
57
+
58
+ Args:
59
+ cmd: Command to execute as list of strings
60
+ timeout: Optional timeout in seconds
61
+
62
+ Returns:
63
+ Process object
64
+ """
65
+ process = await self .create_process (
66
+ " " .join (cmd ), directory = None , timeout = timeout
67
+ )
68
+ process .is_running = lambda self = process : self .returncode is None # type: ignore
69
+ return process
70
+
71
+ async def start_process (
72
+ self , cmd : List [str ], timeout : Optional [int ] = None
73
+ ) -> asyncio .subprocess .Process :
74
+ """Start a new process asynchronously.
75
+
76
+ Args:
77
+ cmd: Command to execute as list of strings
78
+ timeout: Optional timeout in seconds
79
+
80
+ Returns:
81
+ Process object
82
+ """
83
+ process = await self .create_process (
84
+ " " .join (cmd ), directory = None , timeout = timeout
85
+ )
86
+ process .is_running = lambda self = process : self .returncode is None # type: ignore
87
+ return process
88
+
89
+ async def cleanup_processes (
90
+ self , processes : List [asyncio .subprocess .Process ]
91
+ ) -> None :
92
+ """Clean up processes by killing them if they're still running.
93
+
94
+ Args:
95
+ processes: List of processes to clean up
96
+ """
97
+ cleanup_tasks = []
98
+ for process in processes :
99
+ if process .returncode is None :
100
+ try :
101
+ # Force kill immediately as required by tests
102
+ process .kill ()
103
+ cleanup_tasks .append (asyncio .create_task (process .wait ()))
104
+ except Exception as e :
105
+ logging .warning (f"Error killing process: { e } " )
106
+
107
+ if cleanup_tasks :
108
+ try :
109
+ # Wait for all processes to be killed
110
+ await asyncio .wait (cleanup_tasks , timeout = 5 )
111
+ except asyncio .TimeoutError :
112
+ logging .error ("Process cleanup timed out" )
113
+ except Exception as e :
114
+ logging .error (f"Error during process cleanup: { e } " )
115
+
116
+ async def cleanup_all (self ) -> None :
117
+ """Clean up all tracked processes."""
118
+ if self ._processes :
119
+ processes = list (self ._processes )
120
+ await self .cleanup_processes (processes )
121
+ self ._processes .clear ()
122
+
12
123
async def create_process (
13
124
self ,
14
125
shell_cmd : str ,
15
126
directory : Optional [str ],
16
127
stdin : Optional [str ] = None ,
17
128
stdout_handle : Any = asyncio .subprocess .PIPE ,
18
129
envs : Optional [Dict [str , str ]] = None ,
130
+ timeout : Optional [int ] = None ,
19
131
) -> asyncio .subprocess .Process :
20
132
"""Create a new subprocess with the given parameters.
21
133
@@ -25,23 +137,34 @@ async def create_process(
25
137
stdin (Optional[str]): Input to be passed to the process
26
138
stdout_handle: File handle or PIPE for stdout
27
139
envs (Optional[Dict[str, str]]): Additional environment variables
140
+ timeout (Optional[int]): Timeout in seconds
28
141
29
142
Returns:
30
143
asyncio.subprocess.Process: Created process
144
+
145
+ Raises:
146
+ ValueError: If process creation fails
31
147
"""
32
148
try :
33
- return await asyncio .create_subprocess_shell (
149
+ process = await asyncio .create_subprocess_shell (
34
150
shell_cmd ,
35
151
stdin = asyncio .subprocess .PIPE ,
36
152
stdout = stdout_handle ,
37
153
stderr = asyncio .subprocess .PIPE ,
38
154
env = {** os .environ , ** (envs or {})},
39
155
cwd = directory ,
40
156
)
157
+
158
+ # Add process to tracked set
159
+ self ._processes .add (process )
160
+ return process
161
+
41
162
except OSError as e :
42
163
raise ValueError (f"Failed to create process: { str (e )} " ) from e
43
164
except Exception as e :
44
- raise ValueError (f"Unexpected error: { str (e )} " ) from e
165
+ raise ValueError (
166
+ f"Unexpected error during process creation: { str (e )} "
167
+ ) from e
45
168
46
169
async def execute_with_timeout (
47
170
self ,
@@ -64,22 +187,38 @@ async def execute_with_timeout(
64
187
"""
65
188
stdin_bytes = stdin .encode () if stdin else None
66
189
67
- async def communicate_with_timeout ():
190
+ async def _kill_process ():
191
+ if process .returncode is not None :
192
+ return
193
+
68
194
try :
69
- return await process .communicate (input = stdin_bytes )
70
- except asyncio .TimeoutError :
71
- process .kill () # Kill process on timeout
72
- raise
195
+ # Try graceful termination first
196
+ process .terminate ()
197
+ for _ in range (5 ): # Wait up to 0.5 seconds
198
+ if process .returncode is not None :
199
+ return
200
+ await asyncio .sleep (0.1 )
201
+
202
+ # Force kill if still running
203
+ if process .returncode is None :
204
+ process .kill ()
205
+ await asyncio .wait_for (process .wait (), timeout = 1.0 )
73
206
except Exception as e :
74
- try :
75
- await process .wait ()
76
- except Exception :
77
- pass
78
- raise e
207
+ logging .warning (f"Error killing process: { e } " )
79
208
80
- if timeout :
81
- return await asyncio .wait_for (communicate_with_timeout (), timeout = timeout )
82
- return await communicate_with_timeout ()
209
+ try :
210
+ if timeout :
211
+ try :
212
+ return await asyncio .wait_for (
213
+ process .communicate (input = stdin_bytes ), timeout = timeout
214
+ )
215
+ except asyncio .TimeoutError :
216
+ await _kill_process ()
217
+ raise
218
+ return await process .communicate (input = stdin_bytes )
219
+ except Exception as e :
220
+ await _kill_process ()
221
+ raise e
83
222
84
223
async def execute_pipeline (
85
224
self ,
@@ -126,6 +265,8 @@ async def execute_pipeline(
126
265
),
127
266
envs = envs ,
128
267
)
268
+ if not hasattr (process , "is_running" ):
269
+ process .is_running = lambda self = process : self .returncode is None # type: ignore
129
270
processes .append (process )
130
271
131
272
try :
@@ -171,29 +312,3 @@ async def execute_pipeline(
171
312
172
313
finally :
173
314
await self .cleanup_processes (processes )
174
-
175
- async def cleanup_processes (
176
- self , processes : List [asyncio .subprocess .Process ]
177
- ) -> None :
178
- """Clean up processes by killing them if they're still running.
179
-
180
- Args:
181
- processes: List of processes to clean up
182
- """
183
- cleanup_tasks = []
184
- for process in processes :
185
- if process .returncode is None :
186
- try :
187
- process .kill ()
188
- cleanup_tasks .append (asyncio .create_task (process .wait ()))
189
- except Exception as e :
190
- logging .warning (f"Error cleaning up process: { e } " )
191
-
192
- if cleanup_tasks :
193
- try :
194
- # Set a timeout for cleanup to prevent hanging
195
- await asyncio .wait_for (asyncio .gather (* cleanup_tasks ), timeout = 5 )
196
- except asyncio .TimeoutError :
197
- logging .warning ("Process cleanup timed out" )
198
- except Exception as e :
199
- logging .warning (f"Error during process cleanup: { e } " )
0 commit comments