Skip to content

Commit 1fbd4da

Browse files
committed
refactor: Add process manager and mock tests
1 parent 947935b commit 1fbd4da

22 files changed

+1999
-1327
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ filterwarnings = [
4545
"ignore::RuntimeWarning:selectors:",
4646
"ignore::pytest.PytestUnhandledCoroutineWarning:",
4747
"ignore::pytest.PytestUnraisableExceptionWarning:",
48+
"ignore::DeprecationWarning:pytest_asyncio.plugin:",
4849
]
4950

5051
[tool.ruff]
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import shlex
2+
from typing import Dict, List, Tuple, Union
3+
4+
5+
class CommandPreProcessor:
6+
"""
7+
Pre-processes and validates shell commands before execution
8+
"""
9+
10+
def preprocess_command(self, command: List[str]) -> List[str]:
11+
"""
12+
Preprocess the command to handle cases where '|' is attached to a command.
13+
"""
14+
preprocessed_command = []
15+
for token in command:
16+
if token in ["||", "&&", ";"]: # Special shell operators
17+
preprocessed_command.append(token)
18+
elif "|" in token and token != "|":
19+
parts = token.split("|")
20+
preprocessed_command.extend(
21+
[part.strip() for part in parts if part.strip()]
22+
)
23+
preprocessed_command.append("|")
24+
else:
25+
preprocessed_command.append(token)
26+
return preprocessed_command
27+
28+
def clean_command(self, command: List[str]) -> List[str]:
29+
"""
30+
Clean command by trimming whitespace from each part.
31+
Removes empty strings but preserves arguments that are meant to be spaces.
32+
33+
Args:
34+
command (List[str]): Original command and its arguments
35+
36+
Returns:
37+
List[str]: Cleaned command
38+
"""
39+
return [arg for arg in command if arg] # Remove empty strings
40+
41+
def create_shell_command(self, command: List[str]) -> str:
42+
"""
43+
Create a shell command string from a list of arguments.
44+
Handles wildcards and arguments properly.
45+
"""
46+
if not command:
47+
return ""
48+
49+
escaped_args = []
50+
for arg in command:
51+
if arg.isspace():
52+
# Wrap space-only arguments in single quotes
53+
escaped_args.append(f"'{arg}'")
54+
else:
55+
# Properly escape all arguments including those with wildcards
56+
escaped_args.append(shlex.quote(arg.strip()))
57+
58+
return " ".join(escaped_args)
59+
60+
def split_pipe_commands(self, command: List[str]) -> List[List[str]]:
61+
"""
62+
Split commands by pipe operator into separate commands.
63+
64+
Args:
65+
command (List[str]): Command and its arguments with pipe operators
66+
67+
Returns:
68+
List[List[str]]: List of commands split by pipe operator
69+
"""
70+
commands: List[List[str]] = []
71+
current_command: List[str] = []
72+
73+
for arg in command:
74+
if arg.strip() == "|":
75+
if current_command:
76+
commands.append(current_command)
77+
current_command = []
78+
else:
79+
current_command.append(arg)
80+
81+
if current_command:
82+
commands.append(current_command)
83+
84+
return commands
85+
86+
def parse_command(
87+
self, command: List[str]
88+
) -> Tuple[List[str], Dict[str, Union[None, str, bool]]]:
89+
"""
90+
Parse command and extract redirections.
91+
"""
92+
cmd = []
93+
redirects: Dict[str, Union[None, str, bool]] = {
94+
"stdin": None,
95+
"stdout": None,
96+
"stdout_append": False,
97+
}
98+
99+
i = 0
100+
while i < len(command):
101+
token = command[i]
102+
103+
# Shell operators check
104+
if token in ["|", ";", "&&", "||"]:
105+
raise ValueError(f"Unexpected shell operator: {token}")
106+
107+
# Output redirection
108+
if token in [">", ">>"]:
109+
if i + 1 >= len(command):
110+
raise ValueError("Missing path for output redirection")
111+
if i + 1 < len(command) and command[i + 1] in [">", ">>", "<"]:
112+
raise ValueError("Invalid redirection target: operator found")
113+
path = command[i + 1]
114+
redirects["stdout"] = path
115+
redirects["stdout_append"] = token == ">>"
116+
i += 2
117+
continue
118+
119+
# Input redirection
120+
if token == "<":
121+
if i + 1 >= len(command):
122+
raise ValueError("Missing path for input redirection")
123+
path = command[i + 1]
124+
redirects["stdin"] = path
125+
i += 2
126+
continue
127+
128+
cmd.append(token)
129+
i += 1
130+
131+
return cmd, redirects
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""
2+
Provides validation for shell commands and ensures they are allowed to be executed.
3+
"""
4+
5+
import os
6+
from typing import Dict, List
7+
8+
9+
class CommandValidator:
10+
"""
11+
Validates shell commands against a whitelist and checks for unsafe operators.
12+
"""
13+
14+
def __init__(self):
15+
"""
16+
Initialize the validator.
17+
"""
18+
pass
19+
20+
def _get_allowed_commands(self) -> set[str]:
21+
"""Get the set of allowed commands from environment variables"""
22+
allow_commands = os.environ.get("ALLOW_COMMANDS", "")
23+
allowed_commands = os.environ.get("ALLOWED_COMMANDS", "")
24+
commands = allow_commands + "," + allowed_commands
25+
return {cmd.strip() for cmd in commands.split(",") if cmd.strip()}
26+
27+
def get_allowed_commands(self) -> list[str]:
28+
"""Get the list of allowed commands from environment variables"""
29+
return list(self._get_allowed_commands())
30+
31+
def is_command_allowed(self, command: str) -> bool:
32+
"""Check if a command is in the allowed list"""
33+
cmd = command.strip()
34+
return cmd in self._get_allowed_commands()
35+
36+
def validate_no_shell_operators(self, cmd: str) -> None:
37+
"""
38+
Validate that the command does not contain shell operators.
39+
40+
Args:
41+
cmd (str): Command to validate
42+
43+
Raises:
44+
ValueError: If the command contains shell operators
45+
"""
46+
if cmd in [";", "&&", "||", "|"]:
47+
raise ValueError(f"Unexpected shell operator: {cmd}")
48+
49+
def validate_pipeline(self, commands: List[str]) -> Dict[str, str]:
50+
"""
51+
Validate pipeline command and ensure all parts are allowed.
52+
53+
Args:
54+
commands (List[str]): List of commands to validate
55+
56+
Returns:
57+
Dict[str, str]: Error message if validation fails, empty dict if success
58+
59+
Raises:
60+
ValueError: If validation fails
61+
"""
62+
current_cmd: List[str] = []
63+
64+
for token in commands:
65+
if token == "|":
66+
if not current_cmd:
67+
raise ValueError("Empty command before pipe operator")
68+
if not self.is_command_allowed(current_cmd[0]):
69+
raise ValueError(f"Command not allowed: {current_cmd[0]}")
70+
current_cmd = []
71+
elif token in [";", "&&", "||"]:
72+
raise ValueError(f"Unexpected shell operator in pipeline: {token}")
73+
else:
74+
current_cmd.append(token)
75+
76+
if current_cmd:
77+
if not self.is_command_allowed(current_cmd[0]):
78+
raise ValueError(f"Command not allowed: {current_cmd[0]}")
79+
80+
return {}
81+
82+
def validate_command(self, command: List[str]) -> None:
83+
"""
84+
Validate if the command is allowed to be executed.
85+
86+
Args:
87+
command (List[str]): Command and its arguments
88+
89+
Raises:
90+
ValueError: If the command is empty, not allowed, or contains invalid shell operators
91+
"""
92+
if not command:
93+
raise ValueError("Empty command")
94+
95+
allowed_commands = self._get_allowed_commands()
96+
if not allowed_commands:
97+
raise ValueError(
98+
"No commands are allowed. Please set ALLOW_COMMANDS environment variable."
99+
)
100+
101+
# Clean and check the first command
102+
cleaned_cmd = command[0].strip()
103+
if cleaned_cmd not in allowed_commands:
104+
raise ValueError(f"Command not allowed: {cleaned_cmd}")
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import os
2+
from typing import Optional
3+
4+
5+
class DirectoryManager:
6+
"""
7+
Manages directory validation and path operations for shell command execution.
8+
"""
9+
10+
def validate_directory(self, directory: Optional[str]) -> None:
11+
"""
12+
Validate if the directory exists and is accessible.
13+
14+
Args:
15+
directory (Optional[str]): Directory path to validate
16+
17+
Raises:
18+
ValueError: If the directory doesn't exist, not absolute or is not accessible
19+
"""
20+
# make directory required
21+
if directory is None:
22+
raise ValueError("Directory is required")
23+
24+
# verify directory is absolute path
25+
if not os.path.isabs(directory):
26+
raise ValueError(f"Directory must be an absolute path: {directory}")
27+
28+
if not os.path.exists(directory):
29+
raise ValueError(f"Directory does not exist: {directory}")
30+
31+
if not os.path.isdir(directory):
32+
raise ValueError(f"Not a directory: {directory}")
33+
34+
if not os.access(directory, os.R_OK | os.X_OK):
35+
raise ValueError(f"Directory is not accessible: {directory}")
36+
37+
def get_absolute_path(self, path: str, base_directory: Optional[str] = None) -> str:
38+
"""
39+
Get absolute path by joining base directory with path if path is relative.
40+
41+
Args:
42+
path (str): The path to make absolute
43+
base_directory (Optional[str]): Base directory to join with relative paths
44+
45+
Returns:
46+
str: Absolute path
47+
"""
48+
if os.path.isabs(path):
49+
return path
50+
if not base_directory:
51+
return os.path.abspath(path)
52+
return os.path.join(base_directory, path)

0 commit comments

Comments
 (0)