1
1
import os
2
2
import time
3
3
import asyncio
4
+ import shlex
4
5
from typing import Dict , List , Optional , Any
5
6
6
7
class ShellExecutor :
8
+ """
9
+ Executes shell commands in a secure manner by validating against a whitelist.
10
+ """
11
+
7
12
def __init__ (self ):
8
- # Allow whitespace in ALLOW_COMMANDS and trim each command
13
+ """
14
+ Initialize the executor. The allowed commands are read from ALLOW_COMMANDS
15
+ environment variable during command validation, not at initialization.
16
+ """
17
+ pass
18
+
19
+ def _get_allowed_commands (self ) -> set :
20
+ """
21
+ Get the set of allowed commands from environment variable.
22
+
23
+ Returns:
24
+ set: Set of allowed command names
25
+ """
9
26
allow_commands = os .environ .get ("ALLOW_COMMANDS" , "" )
10
- self . allowed_commands = set ( cmd .strip () for cmd in allow_commands .split ("," ) if cmd .strip ())
27
+ return { cmd .strip () for cmd in allow_commands .split ("," ) if cmd .strip ()}
11
28
12
29
def _validate_command (self , command : List [str ]) -> None :
30
+ """
31
+ Validate if the command is allowed to be executed.
32
+
33
+ Args:
34
+ command (List[str]): Command and its arguments
35
+
36
+ Raises:
37
+ ValueError: If the command is empty, not allowed, or contains invalid shell operators
38
+ """
13
39
if not command :
14
40
raise ValueError ("Empty command" )
15
41
16
- # Check first command
17
- if command [0 ] not in self .allowed_commands :
42
+ allowed_commands = self ._get_allowed_commands ()
43
+ if not allowed_commands :
44
+ raise ValueError ("No commands are allowed. Please set ALLOW_COMMANDS environment variable." )
45
+
46
+ if command [0 ] not in allowed_commands :
18
47
raise ValueError (f"Command not allowed: { command [0 ]} " )
19
48
20
- # Check for shell operators and subsequent commands
21
- for arg in command [1 :]:
49
+ # Check for shell operators and validate subsequent commands
50
+ for i , arg in enumerate ( command [1 :], start = 1 ) :
22
51
if arg in [";" , "&&" , "||" , "|" ]:
23
- next_cmd_idx = command . index ( arg ) + 1
24
- if next_cmd_idx < len ( command ):
25
- next_cmd = command [next_cmd_idx ]
26
- if next_cmd not in self . allowed_commands :
27
- raise ValueError (f"Command not allowed: { next_cmd } " )
52
+ if i + 1 >= len ( command ):
53
+ raise ValueError ( f"Unexpected shell operator: { arg } " )
54
+ next_cmd = command [i + 1 ]
55
+ if next_cmd not in allowed_commands :
56
+ raise ValueError (f"Command not allowed after { arg } : { next_cmd } " )
28
57
29
58
async def execute (self , command : List [str ], stdin : Optional [str ] = None ) -> Dict [str , Any ]:
59
+ """
60
+ Execute a shell command with optional stdin input.
61
+
62
+ Args:
63
+ command (List[str]): Command and its arguments
64
+ stdin (Optional[str]): Input to be passed to the command via stdin
65
+
66
+ Returns:
67
+ Dict[str, Any]: Execution result containing stdout, stderr, status code, and execution time.
68
+ If error occurs, result contains additional 'error' field.
69
+ """
70
+ start_time = time .time ()
71
+
30
72
try :
31
73
self ._validate_command (command )
32
74
except ValueError as e :
@@ -35,28 +77,41 @@ async def execute(self, command: List[str], stdin: Optional[str] = None) -> Dict
35
77
"status" : 1 ,
36
78
"stdout" : "" ,
37
79
"stderr" : str (e ),
38
- "execution_time" : 0
80
+ "execution_time" : time . time () - start_time
39
81
}
40
82
41
- start_time = time .time ()
42
-
43
- process = await asyncio .create_subprocess_exec (
44
- * command ,
45
- stdin = asyncio .subprocess .PIPE if stdin else None ,
46
- stdout = asyncio .subprocess .PIPE ,
47
- stderr = asyncio .subprocess .PIPE
48
- )
49
-
50
- if stdin :
51
- stdout , stderr = await process .communicate (stdin .encode ())
52
- else :
53
- stdout , stderr = await process .communicate ()
54
-
55
- execution_time = time .time () - start_time
56
-
57
- return {
58
- "stdout" : stdout .decode () if stdout else "" ,
59
- "stderr" : stderr .decode () if stderr else "" ,
60
- "status" : process .returncode ,
61
- "execution_time" : execution_time
62
- }
83
+ try :
84
+ process = await asyncio .create_subprocess_exec (
85
+ command [0 ],
86
+ * command [1 :],
87
+ stdin = asyncio .subprocess .PIPE if stdin else None ,
88
+ stdout = asyncio .subprocess .PIPE ,
89
+ stderr = asyncio .subprocess .PIPE ,
90
+ env = {"PATH" : os .environ .get ("PATH" , "" )}
91
+ )
92
+
93
+ stdin_bytes = stdin .encode () if stdin else None
94
+ stdout , stderr = await process .communicate (input = stdin_bytes )
95
+
96
+ return {
97
+ "stdout" : stdout .decode () if stdout else "" ,
98
+ "stderr" : stderr .decode () if stderr else "" ,
99
+ "status" : process .returncode ,
100
+ "execution_time" : time .time () - start_time
101
+ }
102
+ except FileNotFoundError :
103
+ return {
104
+ "error" : f"Command not found: { command [0 ]} " ,
105
+ "status" : 1 ,
106
+ "stdout" : "" ,
107
+ "stderr" : f"Command not found: { command [0 ]} " ,
108
+ "execution_time" : time .time () - start_time
109
+ }
110
+ except Exception as e :
111
+ return {
112
+ "error" : str (e ),
113
+ "status" : 1 ,
114
+ "stdout" : "" ,
115
+ "stderr" : str (e ),
116
+ "execution_time" : time .time () - start_time
117
+ }
0 commit comments