Skip to content

Commit 8a3dcf0

Browse files
committed
refactor: separate IORedirectionHandler
- Extract IORedirectionHandler class from ShellExecutor - Improve pipeline execution and error handling - Unify error messages and improve file handle error handling - Add tests for IO redirection and error cases
1 parent d058819 commit 8a3dcf0

7 files changed

+509
-296
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""IO redirection handling module for MCP Shell Server."""
2+
3+
import asyncio
4+
import logging
5+
import os
6+
from typing import IO, Any, Dict, List, Optional, Tuple, Union
7+
8+
9+
class IORedirectionHandler:
10+
"""Handles input/output redirection for shell commands."""
11+
12+
def validate_redirection_syntax(self, command: List[str]) -> None:
13+
"""
14+
Validate the syntax of redirection operators in the command.
15+
16+
Args:
17+
command (List[str]): Command and its arguments including redirections
18+
19+
Raises:
20+
ValueError: If the redirection syntax is invalid
21+
"""
22+
prev_token = None
23+
for token in command:
24+
if token in [">", ">>", "<"]:
25+
if prev_token and prev_token in [">", ">>", "<"]:
26+
raise ValueError(
27+
"Invalid redirection syntax: consecutive operators"
28+
)
29+
prev_token = token
30+
31+
def process_redirections(
32+
self, command: List[str]
33+
) -> Tuple[List[str], Dict[str, Union[Optional[str], bool]]]:
34+
"""
35+
Process input/output redirections in the command.
36+
37+
Args:
38+
command (List[str]): Command and its arguments including redirections
39+
40+
Returns:
41+
Tuple[List[str], Dict[str, Any]]: Processed command without redirections and
42+
redirection configuration
43+
44+
Raises:
45+
ValueError: If the redirection syntax is invalid
46+
"""
47+
self.validate_redirection_syntax(command)
48+
49+
cmd = []
50+
redirects: Dict[str, Union[Optional[str], bool]] = {
51+
"stdin": None,
52+
"stdout": None,
53+
"stdout_append": False,
54+
}
55+
56+
i = 0
57+
while i < len(command):
58+
token = command[i]
59+
60+
# Output redirection
61+
if token in [">", ">>"]:
62+
if i + 1 >= len(command):
63+
raise ValueError("Missing path for output redirection")
64+
path = command[i + 1]
65+
if path in [">", ">>", "<"]:
66+
raise ValueError("Invalid redirection syntax")
67+
redirects["stdout"] = path
68+
redirects["stdout_append"] = token == ">>"
69+
i += 2
70+
continue
71+
72+
# Input redirection
73+
if token == "<":
74+
if i + 1 >= len(command):
75+
raise ValueError("Missing path for input redirection")
76+
path = command[i + 1]
77+
if path in [">", ">>", "<"]:
78+
raise ValueError("Invalid redirection syntax")
79+
redirects["stdin"] = path
80+
i += 2
81+
continue
82+
83+
cmd.append(token)
84+
i += 1
85+
86+
return cmd, redirects
87+
88+
async def setup_redirects(
89+
self,
90+
redirects: Dict[str, Union[Optional[str], bool]],
91+
directory: Optional[str] = None,
92+
) -> Dict[str, Union[IO[Any], int, str, None]]:
93+
"""
94+
Set up file handles for redirections.
95+
96+
Args:
97+
redirects (Dict[str, Union[Optional[str], bool]]): Redirection configuration
98+
directory (Optional[str]): Working directory for file paths
99+
100+
Returns:
101+
Dict[str, Union[IO[Any], int, str, None]]: File handles for subprocess
102+
103+
Raises:
104+
ValueError: If there are issues opening the files
105+
"""
106+
handles: Dict[str, Union[IO[Any], int, str, None]] = {}
107+
108+
# Initialize default handles
109+
handles["stdout"] = asyncio.subprocess.PIPE
110+
handles["stderr"] = asyncio.subprocess.PIPE
111+
112+
# Handle input redirection
113+
stdin_path = redirects["stdin"]
114+
if isinstance(stdin_path, str):
115+
path = self._resolve_path(directory, stdin_path)
116+
try:
117+
with open(path, "r") as file:
118+
handles["stdin"] = asyncio.subprocess.PIPE
119+
handles["stdin_data"] = file.read()
120+
except (FileNotFoundError, IOError) as e:
121+
raise ValueError("Failed to open input file") from e
122+
123+
# Handle output redirection
124+
stdout_path = redirects.get("stdout")
125+
if isinstance(stdout_path, str):
126+
path = self._resolve_path(directory, stdout_path)
127+
mode = "a" if redirects["stdout_append"] else "w"
128+
try:
129+
handles["stdout"] = open(path, mode)
130+
except (FileNotFoundError, IOError) as e:
131+
raise ValueError("Failed to open output file") from e
132+
133+
return handles
134+
135+
def _resolve_path(self, directory: Optional[str], path: str) -> str:
136+
"""Resolves a file path, considering the working directory."""
137+
return os.path.join(directory or "", str(path)) if directory else str(path)
138+
139+
async def cleanup_handles(
140+
self, handles: Dict[str, Union[IO[Any], int, None]]
141+
) -> None:
142+
"""
143+
Clean up file handles after command execution.
144+
145+
Args:
146+
handles (Dict[str, Union[IO[Any], int, None]]): File handles to clean up
147+
148+
Note:
149+
This method suppresses IOError and ValueError that might occur during cleanup,
150+
as they are not critical at this point.
151+
"""
152+
errors = []
153+
for key in ["stdout", "stderr"]:
154+
handle = handles.get(key)
155+
if handle and hasattr(handle, "close") and not isinstance(handle, int):
156+
try:
157+
handle.close()
158+
except (IOError, ValueError) as e:
159+
# Log the error but continue with cleanup
160+
errors.append(f"Error closing {key}: {str(e)}")
161+
logging.warning(f"Error closing {key}: {str(e)}")

0 commit comments

Comments
 (0)