3
3
import asyncio
4
4
import logging
5
5
import os
6
- from typing import IO , Any , Dict , List , Optional , Tuple
6
+ from typing import IO , Any , Dict , List , Optional , Tuple , Union
7
7
8
8
9
9
class ProcessManager :
@@ -29,14 +29,19 @@ async def create_process(
29
29
Returns:
30
30
asyncio.subprocess.Process: Created process
31
31
"""
32
- return await asyncio .create_subprocess_shell (
33
- shell_cmd ,
34
- stdin = asyncio .subprocess .PIPE ,
35
- stdout = stdout_handle ,
36
- stderr = asyncio .subprocess .PIPE ,
37
- env = {** os .environ , ** (envs or {})},
38
- cwd = directory ,
39
- )
32
+ try :
33
+ return await asyncio .create_subprocess_shell (
34
+ shell_cmd ,
35
+ stdin = asyncio .subprocess .PIPE ,
36
+ stdout = stdout_handle ,
37
+ stderr = asyncio .subprocess .PIPE ,
38
+ env = {** os .environ , ** (envs or {})},
39
+ cwd = directory ,
40
+ )
41
+ except OSError as e :
42
+ raise ValueError (f"Failed to create process: { str (e )} " ) from e
43
+ except Exception as e :
44
+ raise ValueError (f"Unexpected error: { str (e )} " ) from e
40
45
41
46
async def execute_with_timeout (
42
47
self ,
@@ -62,6 +67,9 @@ async def execute_with_timeout(
62
67
async def communicate_with_timeout ():
63
68
try :
64
69
return await process .communicate (input = stdin_bytes )
70
+ except asyncio .TimeoutError :
71
+ process .kill () # Kill process on timeout
72
+ raise
65
73
except Exception as e :
66
74
try :
67
75
await process .wait ()
@@ -77,7 +85,7 @@ async def execute_pipeline(
77
85
self ,
78
86
commands : List [str ],
79
87
first_stdin : Optional [bytes ] = None ,
80
- last_stdout : Optional [IO [Any ]] = None ,
88
+ last_stdout : Union [IO [Any ], int , None ] = None ,
81
89
directory : Optional [str ] = None ,
82
90
timeout : Optional [int ] = None ,
83
91
envs : Optional [Dict [str , str ]] = None ,
@@ -94,7 +102,13 @@ async def execute_pipeline(
94
102
95
103
Returns:
96
104
Tuple[bytes, bytes, int]: Tuple of (stdout, stderr, return_code)
105
+
106
+ Raises:
107
+ ValueError: If no commands provided or command execution fails
97
108
"""
109
+ if not commands :
110
+ raise ValueError ("No commands provided" )
111
+
98
112
processes : List [asyncio .subprocess .Process ] = []
99
113
try :
100
114
prev_stdout : Optional [bytes ] = first_stdin
@@ -166,10 +180,20 @@ async def cleanup_processes(
166
180
Args:
167
181
processes: List of processes to clean up
168
182
"""
183
+ cleanup_tasks = []
169
184
for process in processes :
170
185
if process .returncode is None :
171
186
try :
172
187
process .kill ()
173
- await process .wait ()
188
+ cleanup_tasks . append ( asyncio . create_task ( process .wait ()) )
174
189
except Exception as e :
175
190
logging .warning (f"Error cleaning up process: { e } " )
191
+
192
+ if cleanup_tasks :
193
+ try :
194
+ # Set a timeout for cleanup to prevent hanging
195
+ await asyncio .wait_for (asyncio .gather (* cleanup_tasks ), timeout = 5 )
196
+ except asyncio .TimeoutError :
197
+ logging .warning ("Process cleanup timed out" )
198
+ except Exception as e :
199
+ logging .warning (f"Error during process cleanup: { e } " )
0 commit comments