Skip to content

Commit a2f24a3

Browse files
dltnclaude
andauthored
fix: wait for first result before closing stdin if SDK MCP present (#380)
Port SDK MCP fix from TypeScript to Python. Now, when SDK MCP servers or hooks are present, stream_input() waits for the first result message before closing stdin, allowing bidirectional control protocol communication to complete. Fixes repro in #266. The `query()` design requires input streams to be held open by the user for SDK MCP bidirectional communication to work. This has confused a lot of folks, so we're moving towards a more explicit lifecycle design. In the meantime, this is the way we've addressed it with V1 APIs in https://github.com/anthropics/claude-agent-sdk-typescript. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <[email protected]>
1 parent 49482e1 commit a2f24a3

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

src/claude_agent_sdk/_internal/query.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ def __init__(
107107
self._closed = False
108108
self._initialization_result: dict[str, Any] | None = None
109109

110+
# Track first result for proper stream closure with SDK MCP servers
111+
self._first_result_event = anyio.Event()
112+
self._stream_close_timeout = (
113+
float(os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000")) / 1000.0
114+
) # Convert ms to seconds
115+
110116
async def initialize(self) -> dict[str, Any] | None:
111117
"""Initialize control protocol if in streaming mode.
112118
@@ -195,6 +201,10 @@ async def _read_messages(self) -> None:
195201
# TODO: Implement cancellation support
196202
continue
197203

204+
# Track results for proper stream closure
205+
if msg_type == "result":
206+
self._first_result_event.set()
207+
198208
# Regular SDK messages go to the stream
199209
await self._message_send.send(message)
200210

@@ -525,13 +535,35 @@ async def set_model(self, model: str | None) -> None:
525535
)
526536

527537
async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None:
528-
"""Stream input messages to transport."""
538+
"""Stream input messages to transport.
539+
540+
If SDK MCP servers or hooks are present, waits for the first result
541+
before closing stdin to allow bidirectional control protocol communication.
542+
"""
529543
try:
530544
async for message in stream:
531545
if self._closed:
532546
break
533547
await self.transport.write(json.dumps(message) + "\n")
534-
# After all messages sent, end input
548+
549+
# If we have SDK MCP servers or hooks that need bidirectional communication,
550+
# wait for first result before closing the channel
551+
has_hooks = bool(self.hooks)
552+
if self.sdk_mcp_servers or has_hooks:
553+
logger.debug(
554+
f"Waiting for first result before closing stdin "
555+
f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})"
556+
)
557+
try:
558+
with anyio.move_on_after(self._stream_close_timeout):
559+
await self._first_result_event.wait()
560+
logger.debug("Received first result, closing input stream")
561+
except Exception:
562+
logger.debug(
563+
"Timed out waiting for first result, closing input stream"
564+
)
565+
566+
# After all messages sent (and result received if needed), end input
535567
await self.transport.end_input()
536568
except Exception as e:
537569
logger.debug(f"Error streaming input: {e}")

0 commit comments

Comments
 (0)