-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathcommand.py
More file actions
135 lines (112 loc) · 3.87 KB
/
command.py
File metadata and controls
135 lines (112 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import shlex
import subprocess
import sys
import threading
from collections.abc import Mapping
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.redact import redact_text_secrets
logger = get_logger(__name__)
# Env vars that should not be exposed to subprocesses (e.g., bash commands
# executed by the agent). These credentials allow access to user secrets via
# the SaaS API and must remain isolated to the SDK's Python process.
_SENSITIVE_ENV_VARS = frozenset({"SESSION_API_KEY"})
def sanitized_env(
env: Mapping[str, str] | None = None,
) -> dict[str, str]:
"""Return a copy of *env* with sanitized values.
PyInstaller-based binaries rewrite ``LD_LIBRARY_PATH`` so their vendored
libraries win. This function restores the original value so that subprocess
will not use them.
Sensitive environment variables (e.g., ``SESSION_API_KEY``) are stripped
to prevent LLM-driven agents from accessing credentials via terminal
commands.
"""
base_env: dict[str, str]
if env is None:
base_env = dict(os.environ)
else:
base_env = dict(env)
# Strip sensitive env vars to prevent agent access via bash commands
for key in _SENSITIVE_ENV_VARS:
base_env.pop(key, None)
if "LD_LIBRARY_PATH_ORIG" in base_env:
origin = base_env["LD_LIBRARY_PATH_ORIG"]
if origin:
base_env["LD_LIBRARY_PATH"] = origin
else:
base_env.pop("LD_LIBRARY_PATH", None)
return base_env
def execute_command(
cmd: list[str] | str,
env: dict[str, str] | None = None,
cwd: str | None = None,
timeout: float | None = None,
print_output: bool = True,
) -> subprocess.CompletedProcess:
# For string commands, use shell=True to handle shell operators properly
if isinstance(cmd, str):
cmd_to_run = cmd
use_shell = True
cmd_str = cmd
else:
cmd_to_run = cmd
use_shell = False
cmd_str = " ".join(shlex.quote(c) for c in cmd)
# Log the command with sensitive values redacted
logger.info("$ %s", redact_text_secrets(cmd_str))
proc = subprocess.Popen(
cmd_to_run,
cwd=cwd,
env=sanitized_env(env),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
shell=use_shell,
)
if proc is None:
raise RuntimeError("Failed to start process")
# Read line by line, echo to parent stdout/stderr
stdout_lines: list[str] = []
stderr_lines: list[str] = []
if proc.stdout is None or proc.stderr is None:
raise RuntimeError("Failed to capture stdout/stderr")
def read_stream(stream, lines, output_stream):
try:
for line in stream:
if print_output:
output_stream.write(line)
output_stream.flush()
lines.append(line)
except Exception as e:
logger.error(f"Failed to read stream: {e}")
# Read stdout and stderr concurrently to avoid deadlock
stdout_thread = threading.Thread(
target=read_stream, args=(proc.stdout, stdout_lines, sys.stdout)
)
stderr_thread = threading.Thread(
target=read_stream, args=(proc.stderr, stderr_lines, sys.stderr)
)
stdout_thread.start()
stderr_thread.start()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
stdout_thread.join()
stderr_thread.join()
return subprocess.CompletedProcess(
cmd_to_run,
-1, # Indicate timeout with -1 exit code
"".join(stdout_lines),
"".join(stderr_lines),
)
stdout_thread.join(timeout=timeout)
stderr_thread.join(timeout=timeout)
return subprocess.CompletedProcess(
cmd_to_run,
proc.returncode,
"".join(stdout_lines),
"".join(stderr_lines),
)