Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pip install claude-agent-sdk
**Prerequisites:**
- Python 3.10+
- Node.js
- Claude Code: `npm install -g @anthropic-ai/claude-code`
- Claude Code 2.0.0+: `npm install -g @anthropic-ai/claude-code`

## Quick Start

Expand Down
45 changes: 45 additions & 0 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import json
import logging
import os
import re
import shutil
import sys
from collections.abc import AsyncIterable, AsyncIterator
from contextlib import suppress
from dataclasses import asdict
Expand All @@ -25,6 +27,7 @@
logger = logging.getLogger(__name__)

_DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
MINIMUM_CLAUDE_CODE_VERSION = "2.0.0"


class SubprocessCLITransport(Transport):
Expand Down Expand Up @@ -202,6 +205,8 @@ async def connect(self) -> None:
if self._process:
return

await self._check_claude_version()

cmd = self._build_command()
try:
# Merge environment variables: system -> user -> SDK required
Expand Down Expand Up @@ -448,6 +453,46 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
)
raise self._exit_error

async def _check_claude_version(self) -> None:
"""Check Claude Code version and warn if below minimum."""
version_process = None
try:
with anyio.fail_after(2): # 2 second timeout
version_process = await anyio.open_process(
[self._cli_path, "-v"],
stdout=PIPE,
stderr=PIPE,
)

if version_process.stdout:
stdout_bytes = await version_process.stdout.receive()
version_output = stdout_bytes.decode().strip()

match = re.match(r"([0-9]+\.[0-9]+\.[0-9]+)", version_output)
if match:
version = match.group(1)
version_parts = [int(x) for x in version.split(".")]
min_parts = [
int(x) for x in MINIMUM_CLAUDE_CODE_VERSION.split(".")
]

if version_parts < min_parts:
warning = (
f"Warning: Claude Code version {version} is unsupported in the Agent SDK. "
f"Minimum required version is {MINIMUM_CLAUDE_CODE_VERSION}. "
"Some features may not work correctly."
)
logger.warning(warning)
print(warning, file=sys.stderr)
except Exception:
pass
finally:
if version_process:
with suppress(Exception):
version_process.terminate()
with suppress(Exception):
await version_process.wait()

def is_ready(self) -> bool:
"""Check if transport is ready for communication."""
return self._ready
65 changes: 52 additions & 13 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ def test_connect_close(self):

async def _test():
with patch("anyio.open_process") as mock_exec:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()

# Mock main process
mock_process = MagicMock()
mock_process.returncode = None
mock_process.terminate = MagicMock()
Expand All @@ -175,7 +185,8 @@ async def _test():
mock_stdin.aclose = AsyncMock()
mock_process.stdin = mock_stdin

mock_exec.return_value = mock_process
# Return version process first, then main process
mock_exec.side_effect = [mock_version_process, mock_process]

transport = SubprocessCLITransport(
prompt="test",
Expand Down Expand Up @@ -363,13 +374,25 @@ async def _test():
with patch(
"anyio.open_process", new_callable=AsyncMock
) as mock_open_process:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()

# Mock main process
mock_process = MagicMock()
mock_process.stdout = MagicMock()
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock() # Add async aclose method
mock_process.stdin = mock_stdin
mock_process.returncode = None
mock_open_process.return_value = mock_process

# Return version process first, then main process
mock_open_process.side_effect = [mock_version_process, mock_process]

transport = SubprocessCLITransport(
prompt="test",
Expand All @@ -379,11 +402,13 @@ async def _test():

await transport.connect()

# Verify open_process was called with correct env vars
mock_open_process.assert_called_once()
call_kwargs = mock_open_process.call_args.kwargs
assert "env" in call_kwargs
env_passed = call_kwargs["env"]
# Verify open_process was called twice (version check + main process)
assert mock_open_process.call_count == 2

# Check the second call (main process) for env vars
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
assert "env" in second_call_kwargs
env_passed = second_call_kwargs["env"]

# Check that custom env var was passed
assert env_passed["MY_TEST_VAR"] == test_value
Expand All @@ -410,13 +435,25 @@ async def _test():
with patch(
"anyio.open_process", new_callable=AsyncMock
) as mock_open_process:
# Mock version check process
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()

# Mock main process
mock_process = MagicMock()
mock_process.stdout = MagicMock()
mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock() # Add async aclose method
mock_process.stdin = mock_stdin
mock_process.returncode = None
mock_open_process.return_value = mock_process

# Return version process first, then main process
mock_open_process.side_effect = [mock_version_process, mock_process]

transport = SubprocessCLITransport(
prompt="test",
Expand All @@ -426,11 +463,13 @@ async def _test():

await transport.connect()

# Verify open_process was called with correct user
mock_open_process.assert_called_once()
call_kwargs = mock_open_process.call_args.kwargs
assert "user" in call_kwargs
user_passed = call_kwargs["user"]
# Verify open_process was called twice (version check + main process)
assert mock_open_process.call_count == 2

# Check the second call (main process) for user
second_call_kwargs = mock_open_process.call_args_list[1].kwargs
assert "user" in second_call_kwargs
user_passed = second_call_kwargs["user"]

# Check that user was passed
assert user_passed == "claude"
Expand Down