-
Notifications
You must be signed in to change notification settings - Fork 709
Expand file tree
/
Copy pathsubprocess_cli.py
More file actions
238 lines (190 loc) · 8.31 KB
/
subprocess_cli.py
File metadata and controls
238 lines (190 loc) · 8.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Subprocess transport implementation using Claude Code CLI."""
import json
import os
import shutil
from collections.abc import AsyncIterator
from pathlib import Path
from subprocess import PIPE
from typing import Any
import anyio
from anyio.abc import Process
from anyio.streams.text import TextReceiveStream
from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions
from . import Transport
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
def __init__(
self,
prompt: str,
options: ClaudeCodeOptions,
cli_path: str | Path | None = None,
):
self._prompt = prompt
self._options = options
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
self._stderr_stream: TextReceiveStream | None = None
def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
if cli := shutil.which("claude"):
return cli
locations = [
Path.home() / ".npm-global/bin/claude",
Path("/usr/local/bin/claude"),
Path.home() / ".local/bin/claude",
Path.home() / "node_modules/.bin/claude",
Path.home() / ".yarn/bin/claude",
]
for path in locations:
if path.exists() and path.is_file():
return str(path)
node_installed = shutil.which("node") is not None
if not node_installed:
error_msg = "Claude Code requires Node.js, which is not installed.\n\n"
error_msg += "Install Node.js from: https://nodejs.org/\n"
error_msg += "\nAfter installing Node.js, install Claude Code:\n"
error_msg += " npm install -g @anthropic-ai/claude-code"
raise CLINotFoundError(error_msg)
raise CLINotFoundError(
"Claude Code not found. Install with:\n"
" npm install -g @anthropic-ai/claude-code\n"
"\nIf already installed locally, try:\n"
' export PATH="$HOME/node_modules/.bin:$PATH"\n'
"\nOr specify the path when creating transport:\n"
" SubprocessCLITransport(..., cli_path='/path/to/claude')"
)
def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]
if self._options.system_prompt:
cmd.extend(["--system-prompt", self._options.system_prompt])
if self._options.append_system_prompt:
cmd.extend(["--append-system-prompt", self._options.append_system_prompt])
if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])
if self._options.max_turns:
cmd.extend(["--max-turns", str(self._options.max_turns)])
if self._options.disallowed_tools:
cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)])
if self._options.model:
cmd.extend(["--model", self._options.model])
if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
)
if self._options.permission_mode:
cmd.extend(["--permission-mode", self._options.permission_mode])
if self._options.continue_conversation:
cmd.append("--continue")
if self._options.resume:
cmd.extend(["--resume", self._options.resume])
if self._options.mcp_servers:
cmd.extend(
["--mcp-config", json.dumps({"mcpServers": self._options.mcp_servers})]
)
cmd.extend(["--print", self._prompt])
return cmd
async def connect(self) -> None:
"""Start subprocess."""
if self._process:
return
cmd = self._build_command()
try:
self._process = await anyio.open_process(
cmd,
stdin=None,
stdout=PIPE,
stderr=PIPE,
cwd=self._cwd,
env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "sdk-py"},
)
if self._process.stdout:
self._stdout_stream = TextReceiveStream(self._process.stdout)
if self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
except FileNotFoundError as e:
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
except Exception as e:
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e
async def disconnect(self) -> None:
"""Terminate subprocess."""
if not self._process:
return
if self._process.returncode is None:
try:
self._process.terminate()
with anyio.fail_after(5.0):
await self._process.wait()
except TimeoutError:
self._process.kill()
await self._process.wait()
except ProcessLookupError:
pass
self._process = None
self._stdout_stream = None
self._stderr_stream = None
async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None:
"""Not used for CLI transport - args passed via command line."""
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from CLI."""
if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected")
stderr_lines = []
async def read_stderr() -> None:
"""Read stderr in background."""
if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
except anyio.ClosedResourceError:
pass
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)
json_buffer = ""
try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
json_lines = line_str.split("\n")
for json_line in json_lines:
json_line = json_line.strip()
if not json_line:
continue
json_buffer += json_line
if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)
try:
data = json.loads(json_buffer)
json_buffer = ""
try:
yield data
except GeneratorExit:
return
except json.JSONDecodeError:
continue
except anyio.ClosedResourceError:
pass
await self._process.wait()
if self._process.returncode is not None and self._process.returncode != 0:
stderr_output = "\n".join(stderr_lines)
if stderr_output and "error" in stderr_output.lower():
raise ProcessError(
"CLI process failed",
exit_code=self._process.returncode,
stderr=stderr_output,
)
def is_connected(self) -> bool:
"""Check if subprocess is running."""
return self._process is not None and self._process.returncode is None