Skip to content

Commit a5bd172

Browse files
smhananclaude
andcommitted
fix(streaming): tool calls and race conditions for SDK mode
This commit ports critical bug fixes to the dev/v0.2 architecture: 1. **Tool calls SSE-only bug** (processors.py) - Removed `output_format == "sse"` condition that excluded dict format - Tool calls now work in both SSE and dict (SDK) output formats - Added proper tool call indexing with enumerate() - Added debug logging for tool call yielding 2. **Non-atomic broadcast race condition** (stream_worker.py) - Removed racy has_listeners() check before broadcast() - Now always broadcasts and checks delivered count afterward - Fixes message loss with fast STDIO tools like filesystem 3. **Listener pre-registration race** (stream_handle.py) - Worker is now created but not started until listener is registered - Prevents messages from being lost before listener is ready - Fixes first-message loss with rapid tool responses 4. **Orphaned tool_result sanitization** (requests.py) - Added _sanitize_tool_results() function - Removes tool_result blocks that lack matching tool_use blocks - Converts orphaned results to text to preserve information - Prevents "unexpected tool_use_id" API errors Tests added: - test_tool_call_streaming.py: 4 tests for tool call streaming - test_tool_result_sanitization.py: 17 tests for sanitization 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent aad4701 commit a5bd172

File tree

6 files changed

+1187
-27
lines changed

6 files changed

+1187
-27
lines changed

ccproxy/llms/formatters/openai_to_anthropic/requests.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,96 @@
66
from typing import Any
77

88
from ccproxy.core.constants import DEFAULT_MAX_TOKENS
9+
from ccproxy.core.logging import get_logger
910
from ccproxy.llms.models import anthropic as anthropic_models
1011
from ccproxy.llms.models import openai as openai_models
1112

1213

14+
logger = get_logger(__name__)
15+
16+
17+
def _sanitize_tool_results(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
18+
"""Remove orphaned tool_result blocks that don't have matching tool_use blocks.
19+
20+
The Anthropic API requires that each tool_result block must have a corresponding
21+
tool_use block in the immediately preceding assistant message. This function removes
22+
tool_result blocks that don't meet this requirement, converting them to text to
23+
preserve information.
24+
25+
Args:
26+
messages: List of Anthropic format messages
27+
28+
Returns:
29+
Sanitized messages with orphaned tool_results removed or converted to text
30+
"""
31+
if not messages:
32+
return messages
33+
34+
sanitized = []
35+
for i, msg in enumerate(messages):
36+
if msg.get("role") == "user" and isinstance(msg.get("content"), list):
37+
# Find tool_use_ids from the immediately preceding assistant message
38+
valid_tool_use_ids: set[str] = set()
39+
if i > 0 and messages[i - 1].get("role") == "assistant":
40+
prev_content = messages[i - 1].get("content", [])
41+
if isinstance(prev_content, list):
42+
for block in prev_content:
43+
if isinstance(block, dict) and block.get("type") == "tool_use":
44+
tool_id = block.get("id")
45+
if tool_id:
46+
valid_tool_use_ids.add(tool_id)
47+
48+
# Filter content blocks
49+
new_content = []
50+
orphaned_results = []
51+
for block in msg["content"]:
52+
if isinstance(block, dict) and block.get("type") == "tool_result":
53+
tool_use_id = block.get("tool_use_id")
54+
if tool_use_id in valid_tool_use_ids:
55+
new_content.append(block)
56+
else:
57+
# Track orphaned tool_result for conversion to text
58+
orphaned_results.append(block)
59+
logger.warning(
60+
"orphaned_tool_result_removed",
61+
tool_use_id=tool_use_id,
62+
valid_ids=list(valid_tool_use_ids),
63+
message_index=i,
64+
category="message_sanitization",
65+
)
66+
else:
67+
new_content.append(block)
68+
69+
# Convert orphaned results to text block to preserve information
70+
if orphaned_results:
71+
orphan_text = "[Previous tool results from compacted history]\n"
72+
for orphan in orphaned_results:
73+
content = orphan.get("content", "")
74+
if isinstance(content, list):
75+
text_parts = []
76+
for c in content:
77+
if isinstance(c, dict) and c.get("type") == "text":
78+
text_parts.append(c.get("text", ""))
79+
content = "\n".join(text_parts)
80+
# Truncate long content
81+
content_str = str(content)
82+
if len(content_str) > 500:
83+
content_str = content_str[:500] + "..."
84+
orphan_text += f"- Tool {orphan.get('tool_use_id', 'unknown')}: {content_str}\n"
85+
86+
# Add as text block at the beginning
87+
new_content.insert(0, {"type": "text", "text": orphan_text})
88+
89+
# Update message content (only if we have content left)
90+
if new_content:
91+
sanitized.append({**msg, "content": new_content})
92+
# If no content left, skip this message entirely
93+
else:
94+
sanitized.append(msg)
95+
96+
return sanitized
97+
98+
1399
async def convert__openai_chat_to_anthropic_message__request(
14100
request: openai_models.ChatCompletionRequest,
15101
) -> anthropic_models.CreateMessageRequest:
@@ -165,6 +251,9 @@ async def convert__openai_chat_to_anthropic_message__request(
165251
else:
166252
out_messages.append({"role": "user", "content": content})
167253

254+
# Sanitize tool_result blocks to ensure they have matching tool_use blocks
255+
out_messages = _sanitize_tool_results(out_messages)
256+
168257
payload_data: dict[str, Any] = {
169258
"model": model,
170259
"messages": out_messages,
@@ -581,4 +670,5 @@ def derive_thinking_config(
581670
__all__ = [
582671
"convert__openai_chat_to_anthropic_message__request",
583672
"convert__openai_responses_to_anthropic_message__request",
673+
"_sanitize_tool_results", # Exposed for testing
584674
]

ccproxy/llms/streaming/processors.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -421,18 +421,33 @@ async def _process_chunk(
421421
self.current_thinking_text = ""
422422
self.current_thinking_signature = None
423423

424-
elif (
425-
self.tool_calls
426-
and self.enable_tool_calls
427-
and self.output_format == "sse"
428-
):
429-
# Send completed tool calls (only for SSE format, dict format sends immediately)
430-
for tool_call in self.tool_calls.values():
424+
elif self.tool_calls and self.enable_tool_calls:
425+
# Send completed tool calls for both SSE and dict formats
426+
# Previous bug: Only sent for SSE format, causing dict format (SDK mode) to miss tool calls
427+
logger.debug(
428+
"openai_stream_sending_tool_calls",
429+
tool_count=len(self.tool_calls),
430+
output_format=self.output_format,
431+
category="streaming_conversion",
432+
)
433+
434+
for tool_call_index, (tool_call_id, tool_call) in enumerate(
435+
self.tool_calls.items()
436+
):
437+
logger.debug(
438+
"openai_stream_tool_call_yielding",
439+
tool_call_id=tool_call_id,
440+
tool_name=tool_call["name"],
441+
has_arguments=bool(tool_call["arguments"]),
442+
index=tool_call_index,
443+
category="streaming_conversion",
444+
)
445+
431446
yield self._format_chunk_output(
432447
delta={
433448
"tool_calls": [
434449
{
435-
"index": 0,
450+
"index": tool_call_index,
436451
"id": tool_call["id"],
437452
"type": "function",
438453
"function": {
@@ -446,6 +461,14 @@ async def _process_chunk(
446461
}
447462
)
448463

464+
# Clear tool_calls after yielding to prevent duplicates
465+
logger.debug(
466+
"openai_stream_clearing_tool_calls",
467+
cleared_count=len(self.tool_calls),
468+
category="streaming_conversion",
469+
)
470+
self.tool_calls.clear()
471+
449472
elif chunk_type == "message_delta":
450473
# Usage information
451474
usage = chunk_data.get("usage", {})

ccproxy/plugins/claude_sdk/stream_handle.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,26 +74,49 @@ def __init__(
7474
async def create_listener(self) -> AsyncIterator[Any]:
7575
"""Create a new listener for this stream.
7676
77-
This method starts the worker on first listener and returns
78-
an async iterator for consuming messages.
77+
This method creates the worker if needed, pre-registers the listener,
78+
then starts the worker. This ordering prevents race conditions where
79+
fast STDIO tools could return results before the listener was ready.
7980
8081
Yields:
8182
Messages from the stream
8283
"""
83-
# Start worker if needed
84-
await self._ensure_worker_started()
84+
# Create worker if needed (but don't start yet)
85+
async with self._worker_lock:
86+
if self._worker is None:
87+
worker_id = f"{self.handle_id}-worker"
88+
self._worker = StreamWorker(
89+
worker_id=worker_id,
90+
message_iterator=self._message_iterator,
91+
session_id=self.session_id,
92+
request_id=self.request_id,
93+
session_client=self._session_client,
94+
stream_handle=self,
95+
)
96+
logger.debug(
97+
"stream_handle_worker_created",
98+
handle_id=self.handle_id,
99+
worker_id=worker_id,
100+
session_id=self.session_id,
101+
category="streaming",
102+
)
85103

86104
if not self._worker:
87-
raise RuntimeError("Failed to start stream worker")
105+
raise RuntimeError("Failed to create stream worker")
88106

89-
# Create listener
107+
# Pre-register listener BEFORE starting worker
108+
# This fixes the race condition where fast STDIO tools could
109+
# return results before the listener was ready
90110
queue = self._worker.get_message_queue()
91111
listener = await queue.create_listener()
92112
self._listeners[listener.listener_id] = listener
93113

94114
if self._first_listener_at is None:
95115
self._first_listener_at = time.time()
96116

117+
# NOW start the worker (after listener is registered)
118+
await self._worker.start()
119+
97120
logger.debug(
98121
"stream_handle_listener_created",
99122
handle_id=self.handle_id,
@@ -135,11 +158,15 @@ async def create_listener(self) -> AsyncIterator[Any]:
135158
# Check if we should trigger cleanup
136159
await self._check_cleanup()
137160

138-
async def _ensure_worker_started(self) -> None:
139-
"""Ensure the worker is started, creating it if needed."""
161+
async def _ensure_worker_created(self) -> None:
162+
"""Ensure the worker is created (but not started).
163+
164+
Note: Worker is now started in create_listener() AFTER listener
165+
registration to prevent race conditions with fast STDIO tools.
166+
"""
140167
async with self._worker_lock:
141168
if self._worker is None:
142-
# Create worker
169+
# Create worker (but don't start - that happens after listener registration)
143170
worker_id = f"{self.handle_id}-worker"
144171
self._worker = StreamWorker(
145172
worker_id=worker_id,
@@ -150,9 +177,6 @@ async def _ensure_worker_started(self) -> None:
150177
stream_handle=self, # Pass self for message tracking
151178
)
152179

153-
# Start worker
154-
await self._worker.start()
155-
156180
logger.debug(
157181
"stream_handle_worker_created",
158182
handle_id=self.handle_id,

ccproxy/plugins/claude_sdk/stream_worker.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,12 @@ async def _run_worker(self) -> None:
194194
self._total_messages += 1
195195
self._last_message_time = time.time()
196196

197-
# Check if we have listeners
198-
if await self._message_queue.has_listeners():
199-
# Broadcast to all listeners
200-
delivered_count = await self._message_queue.broadcast(message)
201-
self._messages_delivered += delivered_count
197+
# Always broadcast - the queue handles no-listeners case atomically
198+
# Previous bug: Separate has_listeners() check was racy with fast STDIO tools
199+
delivered_count = await self._message_queue.broadcast(message)
202200

201+
if delivered_count > 0:
202+
self._messages_delivered += delivered_count
203203
logger.trace(
204204
"stream_worker_message_delivered",
205205
worker_id=self.worker_id,
@@ -208,9 +208,8 @@ async def _run_worker(self) -> None:
208208
total_messages=self._total_messages,
209209
)
210210
else:
211-
# No listeners - discard message
211+
# No listeners at broadcast time - message discarded
212212
self._messages_discarded += 1
213-
214213
logger.trace(
215214
"stream_worker_message_discarded",
216215
worker_id=self.worker_id,

0 commit comments

Comments
 (0)