Skip to content

Commit c2c59ca

Browse files
committed
WIP: Add ProcessManager to handle process lifecycle
1 parent 134c658 commit c2c59ca

File tree

1 file changed

+160
-0
lines changed

1 file changed

+160
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
"""Process management for shell command execution."""
2+
import asyncio
3+
import logging
4+
import os
5+
from typing import Dict, IO, Optional, Tuple, Any, List
6+
7+
8+
class ProcessManager:
9+
"""Manages process creation, execution, and cleanup for shell commands."""
10+
11+
async def create_process(
12+
self,
13+
shell_cmd: str,
14+
directory: Optional[str],
15+
stdin: Optional[str] = None,
16+
stdout_handle: Any = asyncio.subprocess.PIPE,
17+
envs: Optional[Dict[str, str]] = None,
18+
) -> asyncio.subprocess.Process:
19+
"""Create a new subprocess with the given parameters.
20+
21+
Args:
22+
shell_cmd (str): Shell command to execute
23+
directory (Optional[str]): Working directory
24+
stdin (Optional[str]): Input to be passed to the process
25+
stdout_handle: File handle or PIPE for stdout
26+
envs (Optional[Dict[str, str]]): Additional environment variables
27+
28+
Returns:
29+
asyncio.subprocess.Process: Created process
30+
"""
31+
return await asyncio.create_subprocess_shell(
32+
shell_cmd,
33+
stdin=asyncio.subprocess.PIPE,
34+
stdout=stdout_handle,
35+
stderr=asyncio.subprocess.PIPE,
36+
env={**os.environ, **(envs or {})},
37+
cwd=directory,
38+
)
39+
40+
async def execute_with_timeout(
41+
self,
42+
process: asyncio.subprocess.Process,
43+
stdin: Optional[str] = None,
44+
timeout: Optional[int] = None,
45+
) -> Tuple[bytes, bytes]:
46+
"""Execute the process with timeout handling.
47+
48+
Args:
49+
process: Process to execute
50+
stdin (Optional[str]): Input to pass to the process
51+
timeout (Optional[int]): Timeout in seconds
52+
53+
Returns:
54+
Tuple[bytes, bytes]: Tuple of (stdout, stderr)
55+
56+
Raises:
57+
asyncio.TimeoutError: If execution times out
58+
"""
59+
stdin_bytes = stdin.encode() if stdin else None
60+
61+
async def communicate_with_timeout():
62+
try:
63+
return await process.communicate(input=stdin_bytes)
64+
except Exception as e:
65+
try:
66+
await process.wait()
67+
except Exception:
68+
pass
69+
raise e
70+
71+
if timeout:
72+
return await asyncio.wait_for(communicate_with_timeout(), timeout=timeout)
73+
return await communicate_with_timeout()
74+
75+
async def execute_pipeline(
76+
self,
77+
commands: List[str],
78+
first_stdin: Optional[bytes] = None,
79+
last_stdout: Optional[IO[Any]] = None,
80+
directory: Optional[str] = None,
81+
timeout: Optional[int] = None,
82+
envs: Optional[Dict[str, str]] = None,
83+
) -> Tuple[bytes, bytes, int]:
84+
"""Execute a pipeline of commands.
85+
86+
Args:
87+
commands: List of shell commands to execute in pipeline
88+
first_stdin: Input to pass to the first command
89+
last_stdout: Output handle for the last command
90+
directory: Working directory
91+
timeout: Timeout in seconds
92+
envs: Additional environment variables
93+
94+
Returns:
95+
Tuple[bytes, bytes, int]: Tuple of (stdout, stderr, return_code)
96+
"""
97+
processes: List[asyncio.subprocess.Process] = []
98+
try:
99+
prev_stdout: Optional[bytes] = first_stdin
100+
final_stderr: bytes = b""
101+
final_stdout: bytes = b""
102+
103+
for i, cmd in enumerate(commands):
104+
process = await self.create_process(
105+
cmd,
106+
directory,
107+
stdout_handle=(
108+
asyncio.subprocess.PIPE
109+
if i < len(commands) - 1 or not last_stdout
110+
else last_stdout
111+
),
112+
envs=envs,
113+
)
114+
processes.append(process)
115+
116+
try:
117+
stdout, stderr = await self.execute_with_timeout(
118+
process, stdin=prev_stdout.decode() if prev_stdout else None, timeout=timeout
119+
)
120+
121+
final_stderr += stderr if stderr else b""
122+
if process.returncode != 0:
123+
error_msg = stderr.decode("utf-8", errors="replace").strip()
124+
if not error_msg:
125+
error_msg = f"Command failed with exit code {process.returncode}"
126+
raise ValueError(error_msg)
127+
128+
if i == len(commands) - 1:
129+
if last_stdout and isinstance(last_stdout, IO):
130+
last_stdout.write(stdout.decode("utf-8", errors="replace"))
131+
else:
132+
final_stdout = stdout if stdout else b""
133+
else:
134+
prev_stdout = stdout if stdout else b""
135+
136+
except asyncio.TimeoutError:
137+
process.kill()
138+
raise
139+
except Exception:
140+
process.kill()
141+
raise
142+
143+
return final_stdout, final_stderr, processes[-1].returncode if processes else 1
144+
145+
finally:
146+
await self.cleanup_processes(processes)
147+
148+
async def cleanup_processes(self, processes: List[asyncio.subprocess.Process]) -> None:
149+
"""Clean up processes by killing them if they're still running.
150+
151+
Args:
152+
processes: List of processes to clean up
153+
"""
154+
for process in processes:
155+
if process.returncode is None:
156+
try:
157+
process.kill()
158+
await process.wait()
159+
except Exception as e:
160+
logging.warning(f"Error cleaning up process: {e}")

0 commit comments

Comments
 (0)