Skip to content

Commit 5f90eb8

Browse files
dicksontsairushilpatel0
authored andcommitted
Implement proper client and bidi streaming
1 parent 055d60f commit 5f90eb8

File tree

7 files changed

+633
-144
lines changed

7 files changed

+633
-144
lines changed

examples/streaming_mode_example.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
#!/usr/bin/env python3
2+
"""Example demonstrating streaming mode with bidirectional communication."""
3+
4+
import asyncio
5+
from collections.abc import AsyncIterator
6+
7+
from claude_code_sdk import ClaudeCodeOptions, ClaudeSDKClient, query
8+
9+
10+
async def create_message_stream() -> AsyncIterator[dict]:
11+
"""Create an async stream of user messages."""
12+
# Example messages to send
13+
messages = [
14+
{
15+
"type": "user",
16+
"message": {
17+
"role": "user",
18+
"content": "Hello! Please tell me a bit about Python async programming.",
19+
},
20+
"parent_tool_use_id": None,
21+
"session_id": "example-session-1",
22+
},
23+
# Add a delay to simulate interactive conversation
24+
None, # We'll use this as a signal to delay
25+
{
26+
"type": "user",
27+
"message": {
28+
"role": "user",
29+
"content": "Can you give me a simple code example?",
30+
},
31+
"parent_tool_use_id": None,
32+
"session_id": "example-session-1",
33+
},
34+
]
35+
36+
for msg in messages:
37+
if msg is None:
38+
await asyncio.sleep(2) # Simulate user thinking time
39+
continue
40+
yield msg
41+
42+
43+
async def example_string_mode():
44+
"""Example using traditional string mode (backward compatible)."""
45+
print("=== String Mode Example ===")
46+
47+
# Option 1: Using query function
48+
async for message in query(
49+
prompt="What is 2+2? Please give a brief answer.", options=ClaudeCodeOptions()
50+
):
51+
print(f"Received: {type(message).__name__}")
52+
if hasattr(message, "content"):
53+
print(f" Content: {message.content}")
54+
55+
print("Completed\n")
56+
57+
58+
async def example_streaming_mode():
59+
"""Example using new streaming mode with async iterable."""
60+
print("=== Streaming Mode Example ===")
61+
62+
options = ClaudeCodeOptions()
63+
64+
# Create message stream
65+
message_stream = create_message_stream()
66+
67+
# Use query with async iterable
68+
message_count = 0
69+
async for message in query(prompt=message_stream, options=options):
70+
message_count += 1
71+
msg_type = type(message).__name__
72+
73+
print(f"\nMessage #{message_count} ({msg_type}):")
74+
75+
if hasattr(message, "content"):
76+
content = message.content
77+
if isinstance(content, list):
78+
for block in content:
79+
if hasattr(block, "text"):
80+
print(f" {block.text}")
81+
else:
82+
print(f" {content}")
83+
elif hasattr(message, "subtype"):
84+
print(f" Subtype: {message.subtype}")
85+
86+
print("\nCompleted")
87+
88+
89+
async def example_with_context_manager():
90+
"""Example using context manager for cleaner code."""
91+
print("=== Context Manager Example ===")
92+
93+
# Simple one-shot query with automatic cleanup
94+
async with ClaudeSDKClient() as client:
95+
await client.send_message("What is the meaning of life?")
96+
async for message in client.receive_messages():
97+
if hasattr(message, "content"):
98+
print(f"Response: {message.content}")
99+
100+
print("\nCompleted with automatic cleanup\n")
101+
102+
103+
async def example_with_interrupt():
104+
"""Example demonstrating interrupt functionality."""
105+
print("=== Streaming Mode with Interrupt Example ===")
106+
107+
options = ClaudeCodeOptions()
108+
client = ClaudeSDKClient(options=options)
109+
110+
async def interruptible_stream():
111+
"""Stream that we'll interrupt."""
112+
yield {
113+
"type": "user",
114+
"message": {
115+
"role": "user",
116+
"content": "Count to 1000 slowly, saying each number.",
117+
},
118+
"parent_tool_use_id": None,
119+
"session_id": "interrupt-example",
120+
}
121+
# Keep the stream open by waiting indefinitely
122+
# This prevents stdin from being closed
123+
await asyncio.Event().wait()
124+
125+
try:
126+
await client.connect(interruptible_stream())
127+
print("Connected - will interrupt after 3 seconds")
128+
129+
# Create tasks for receiving and interrupting
130+
async def receive_and_interrupt():
131+
# Start a background task to continuously receive messages
132+
async def receive_messages():
133+
async for message in client.receive_messages():
134+
msg_type = type(message).__name__
135+
print(f"Received: {msg_type}")
136+
137+
if hasattr(message, "content") and isinstance(
138+
message.content, list
139+
):
140+
for block in message.content:
141+
if hasattr(block, "text"):
142+
print(f" {block.text[:50]}...") # First 50 chars
143+
144+
# Start receiving in background
145+
receive_task = asyncio.create_task(receive_messages())
146+
147+
# Wait 3 seconds then interrupt
148+
await asyncio.sleep(3)
149+
print("\nSending interrupt signal...")
150+
151+
try:
152+
await client.interrupt()
153+
print("Interrupt sent successfully")
154+
except Exception as e:
155+
print(f"Interrupt error: {e}")
156+
157+
# Give some time to see any final messages
158+
await asyncio.sleep(2)
159+
160+
# Cancel the receive task
161+
receive_task.cancel()
162+
try:
163+
await receive_task
164+
except asyncio.CancelledError:
165+
pass
166+
167+
await receive_and_interrupt()
168+
169+
except Exception as e:
170+
print(f"Error: {e}")
171+
finally:
172+
await client.disconnect()
173+
print("\nDisconnected")
174+
175+
176+
async def main():
177+
"""Run all examples."""
178+
# Run string mode example
179+
await example_string_mode()
180+
181+
# Run streaming mode example
182+
await example_streaming_mode()
183+
184+
# Run context manager example
185+
await example_with_context_manager()
186+
187+
# Run interrupt example
188+
await example_with_interrupt()
189+
190+
191+
if __name__ == "__main__":
192+
asyncio.run(main())

src/claude_code_sdk/__init__.py

Lines changed: 7 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
"""Claude SDK for Python."""
22

3-
import os
4-
from collections.abc import AsyncIterator
53

64
from ._errors import (
75
ClaudeSDKError,
@@ -10,8 +8,12 @@
108
CLINotFoundError,
119
ProcessError,
1210
)
13-
from ._internal.client import InternalClient
11+
1412
from ._internal.transport import Transport
13+
14+
from .client import ClaudeSDKClient
15+
from .query import query
16+
1517
from .types import (
1618
AssistantMessage,
1719
ClaudeCodeOptions,
@@ -30,10 +32,11 @@
3032
__version__ = "0.0.14"
3133

3234
__all__ = [
33-
# Main function
35+
# Main exports
3436
"query",
3537
# Transport
3638
"Transport",
39+
"ClaudeSDKClient",
3740
# Types
3841
"PermissionMode",
3942
"McpServerConfig",
@@ -54,62 +57,3 @@
5457
"ProcessError",
5558
"CLIJSONDecodeError",
5659
]
57-
58-
59-
async def query(
60-
*, prompt: str, options: ClaudeCodeOptions | None = None, transport: Transport | None = None
61-
) -> AsyncIterator[Message]:
62-
"""
63-
Query Claude Code.
64-
65-
Python SDK for interacting with Claude Code.
66-
67-
Args:
68-
prompt: The prompt to send to Claude
69-
options: Optional configuration (defaults to ClaudeCodeOptions() if None).
70-
Set options.permission_mode to control tool execution:
71-
- 'default': CLI prompts for dangerous tools
72-
- 'acceptEdits': Auto-accept file edits
73-
- 'bypassPermissions': Allow all tools (use with caution)
74-
Set options.cwd for working directory.
75-
transport: Optional transport implementation. If provided, this will be used
76-
instead of the default transport selection based on options.
77-
The transport will be automatically configured with the prompt and options.
78-
79-
Yields:
80-
Messages from the conversation
81-
82-
83-
Example:
84-
```python
85-
# Simple usage
86-
async for message in query(prompt="Hello"):
87-
print(message)
88-
89-
# With options
90-
async for message in query(
91-
prompt="Hello",
92-
options=ClaudeCodeOptions(
93-
system_prompt="You are helpful",
94-
cwd="/home/user"
95-
)
96-
):
97-
print(message)
98-
99-
# With custom transport (no need to pass prompt to transport)
100-
101-
async for message in query(
102-
prompt="Hello",
103-
):
104-
print(message)
105-
```
106-
"""
107-
if options is None:
108-
options = ClaudeCodeOptions()
109-
110-
os.environ["CLAUDE_CODE_ENTRYPOINT"] = "sdk-py"
111-
112-
client = InternalClient()
113-
114-
async for message in client.process_query(prompt=prompt, options=options, transport=transport):
115-
yield message
Lines changed: 8 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Internal client implementation."""
22

3-
from collections.abc import AsyncIterator
3+
from collections.abc import AsyncIterable, AsyncIterator
44
from typing import Any
55

6+
67
from ..types import (
78
AssistantMessage,
89
ClaudeCodeOptions,
@@ -16,6 +17,10 @@
1617
UserMessage,
1718
)
1819
from .transport import Transport
20+
21+
from ..types import ClaudeCodeOptions, Message
22+
from .message_parser import parse_message
23+
1924
from .transport.subprocess_cli import SubprocessCLITransport
2025

2126

@@ -34,71 +39,17 @@ async def process_query(
3439
if transport is not None:
3540
chosen_transport = transport
3641
else:
37-
chosen_transport = SubprocessCLITransport()
42+
chosen_transport = SubprocessCLITransport(prompt, options)
3843

3944
try:
4045
# Configure the transport with prompt and options
4146
chosen_transport.configure(prompt, options)
4247
await chosen_transport.connect()
4348

4449
async for data in chosen_transport.receive_messages():
45-
message = self._parse_message(data)
50+
message = parse_message(data)
4651
if message:
4752
yield message
4853

4954
finally:
5055
await chosen_transport.disconnect()
51-
52-
def _parse_message(self, data: dict[str, Any]) -> Message | None:
53-
"""Parse message from CLI output, trusting the structure."""
54-
55-
match data["type"]:
56-
case "user":
57-
return UserMessage(content=data["message"]["content"])
58-
59-
case "assistant":
60-
content_blocks: list[ContentBlock] = []
61-
for block in data["message"]["content"]:
62-
match block["type"]:
63-
case "text":
64-
content_blocks.append(TextBlock(text=block["text"]))
65-
case "tool_use":
66-
content_blocks.append(
67-
ToolUseBlock(
68-
id=block["id"],
69-
name=block["name"],
70-
input=block["input"],
71-
)
72-
)
73-
case "tool_result":
74-
content_blocks.append(
75-
ToolResultBlock(
76-
tool_use_id=block["tool_use_id"],
77-
content=block.get("content"),
78-
is_error=block.get("is_error"),
79-
)
80-
)
81-
82-
return AssistantMessage(content=content_blocks)
83-
84-
case "system":
85-
return SystemMessage(
86-
subtype=data["subtype"],
87-
data=data,
88-
)
89-
90-
case "result":
91-
return ResultMessage(
92-
subtype=data["subtype"],
93-
duration_ms=data["duration_ms"],
94-
duration_api_ms=data["duration_api_ms"],
95-
is_error=data["is_error"],
96-
num_turns=data["num_turns"],
97-
session_id=data["session_id"],
98-
total_cost_usd=data.get("total_cost_usd"),
99-
usage=data.get("usage"),
100-
result=data.get("result"),
101-
)
102-
103-
case _:
104-
return None

0 commit comments

Comments
 (0)