Skip to content

Commit 4dac512

Browse files
committed
WIP: Split IORedirectionHandler from ShellExecutor
- Created new IORedirectionHandler class - Moved IO redirection related methods - Still need to fix tests and resolve issues - Known issues: - stdin/stdout processing - pipeline execution - test failures
1 parent d058819 commit 4dac512

File tree

2 files changed

+190
-199
lines changed

2 files changed

+190
-199
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
"""IO redirection handling module for MCP Shell Server."""
2+
3+
import asyncio
4+
import os
5+
from typing import IO, Any, Dict, List, Optional, Tuple, Union
6+
7+
8+
class IORedirectionHandler:
9+
"""Handles input/output redirection for shell commands."""
10+
11+
def validate_redirection_syntax(self, command: List[str]) -> None:
12+
"""
13+
Validate the syntax of redirection operators in the command.
14+
15+
Args:
16+
command (List[str]): Command and its arguments including redirections
17+
18+
Raises:
19+
ValueError: If the redirection syntax is invalid
20+
"""
21+
prev_token = None
22+
for token in command:
23+
if token in [">", ">>", "<"]:
24+
if prev_token and prev_token in [">", ">>", "<"]:
25+
raise ValueError(
26+
"Invalid redirection syntax: consecutive operators"
27+
)
28+
prev_token = token
29+
30+
def process_redirections(
31+
self, command: List[str]
32+
) -> Tuple[List[str], Dict[str, Union[None, str, bool]]]:
33+
"""
34+
Process input/output redirections in the command.
35+
36+
Args:
37+
command (List[str]): Command and its arguments including redirections
38+
39+
Returns:
40+
Tuple[List[str], Dict[str, Any]]: Processed command without redirections and
41+
redirection configuration
42+
43+
Raises:
44+
ValueError: If the redirection syntax is invalid
45+
"""
46+
self.validate_redirection_syntax(command)
47+
48+
cmd = []
49+
redirects: Dict[str, Union[None, str, bool]] = {
50+
"stdin": None,
51+
"stdout": None,
52+
"stdout_append": False,
53+
}
54+
55+
i = 0
56+
while i < len(command):
57+
token = command[i]
58+
59+
# Output redirection
60+
if token in [">", ">>"]:
61+
if i + 1 >= len(command):
62+
raise ValueError("Missing path for output redirection")
63+
if i + 1 < len(command) and command[i + 1] in [">", ">>", "<"]:
64+
raise ValueError("Invalid redirection target: operator found")
65+
path = command[i + 1]
66+
redirects["stdout"] = path
67+
redirects["stdout_append"] = token == ">>"
68+
i += 2
69+
continue
70+
71+
# Input redirection
72+
if token == "<":
73+
if i + 1 >= len(command):
74+
raise ValueError("Missing path for input redirection")
75+
path = command[i + 1]
76+
if path in [">", ">>", "<"]:
77+
raise ValueError("Invalid redirection target: operator found")
78+
redirects["stdin"] = path
79+
i += 2
80+
continue
81+
82+
cmd.append(token)
83+
i += 1
84+
85+
return cmd, redirects
86+
87+
async def setup_redirects(
88+
self,
89+
redirects: Dict[str, Union[None, str, bool]],
90+
directory: Optional[str] = None,
91+
) -> Dict[str, Union[IO[Any], int, str, None]]:
92+
"""
93+
Set up file handles for redirections.
94+
95+
Args:
96+
redirects (Dict[str, Union[None, str, bool]]): Redirection configuration
97+
directory (Optional[str]): Working directory for file paths
98+
99+
Returns:
100+
Dict[str, Union[IO[Any], int, str, None]]: File handles for subprocess
101+
"""
102+
handles: Dict[str, Union[IO[Any], int, str, None]] = {}
103+
104+
# Handle input redirection
105+
if redirects["stdin"]:
106+
path = (
107+
os.path.join(directory or "", str(redirects["stdin"]))
108+
if directory and redirects["stdin"]
109+
else str(redirects["stdin"])
110+
)
111+
try:
112+
file = open(path, "r")
113+
handles["stdin"] = asyncio.subprocess.PIPE
114+
handles["stdin_data"] = file.read()
115+
file.close()
116+
except IOError as e:
117+
raise ValueError(f"Failed to open input file: {e}") from e
118+
119+
# Handle output redirection
120+
if redirects["stdout"]:
121+
path = (
122+
os.path.join(directory or "", str(redirects["stdout"]))
123+
if directory and redirects["stdout"]
124+
else str(redirects["stdout"])
125+
)
126+
mode = "a" if redirects["stdout_append"] else "w"
127+
try:
128+
handles["stdout"] = open(path, mode)
129+
except IOError as e:
130+
raise ValueError(f"Failed to open output file: {e}") from e
131+
else:
132+
handles["stdout"] = asyncio.subprocess.PIPE
133+
134+
handles["stderr"] = asyncio.subprocess.PIPE
135+
136+
return handles
137+
138+
async def cleanup_handles(
139+
self, handles: Dict[str, Union[IO[Any], int, None]]
140+
) -> None:
141+
"""
142+
Clean up file handles after command execution.
143+
144+
Args:
145+
handles (Dict[str, Union[IO[Any], int, None]]): File handles to clean up
146+
"""
147+
for key in ["stdout", "stderr"]:
148+
handle = handles.get(key)
149+
if handle and hasattr(handle, "close") and not isinstance(handle, int):
150+
try:
151+
handle.close()
152+
except (IOError, ValueError):
153+
pass

0 commit comments

Comments
 (0)