Skip to content

Commit 08b11cb

Browse files
authored
fix(llma): streaming providers with tool calls (#319)
* fix(llma): tool calls in streaming Anthropic * fix(llma): Gemini content * fix(llma): extract converters for providers * fix(llma): continuation of DRY refactoring * fix(llma): add $ai_tools to streaming Gemini * fix(llma): tool calls in streaming Gemini * fix(llma): tool calls in streaming OpenAI Chat Completions * fix(llma): fix test * fix(llma): run ruff * fix(llma): fix types * fix(llma): run ruff * chore(llma): run mypy baseline sync * chore(llma): bump version * fix(llma): fix test * chore(llma): update CHANGELOG * fix(llma): Responses API streaming tokens * fix(llma): run ruff * fix(llma): run ruff
1 parent cee26bb commit 08b11cb

File tree

19 files changed

+2863
-786
lines changed

19 files changed

+2863
-786
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 6.7.2 - 2025-09-03
2+
3+
- fix: tool call results in streaming providers
4+
15
# 6.7.1 - 2025-09-01
26

37
- fix: Add base64 inline image sanitization

mypy-baseline.txt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,5 @@ posthog/client.py:0: error: "None" has no attribute "start" [attr-defined]
3636
posthog/client.py:0: error: "None" has no attribute "get" [attr-defined]
3737
posthog/client.py:0: error: Statement is unreachable [unreachable]
3838
posthog/client.py:0: error: Statement is unreachable [unreachable]
39-
posthog/ai/utils.py:0: error: Need type annotation for "output" (hint: "output: list[<type>] = ...") [var-annotated]
40-
posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [valid-type]
41-
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
42-
posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [valid-type]
43-
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
4439
posthog/client.py:0: error: Name "urlparse" already defined (possibly by an import) [no-redef]
4540
posthog/client.py:0: error: Name "parse_qs" already defined (possibly by an import) [no-redef]

posthog/ai/anthropic/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
AsyncAnthropicBedrock,
77
AsyncAnthropicVertex,
88
)
9+
from .anthropic_converter import (
10+
format_anthropic_response,
11+
format_anthropic_input,
12+
extract_anthropic_tools,
13+
format_anthropic_streaming_content,
14+
)
915

1016
__all__ = [
1117
"Anthropic",
@@ -14,4 +20,8 @@
1420
"AsyncAnthropicBedrock",
1521
"AnthropicVertex",
1622
"AsyncAnthropicVertex",
23+
"format_anthropic_response",
24+
"format_anthropic_input",
25+
"extract_anthropic_tools",
26+
"format_anthropic_streaming_content",
1727
]

posthog/ai/anthropic/anthropic.py

Lines changed: 93 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@
88

99
import time
1010
import uuid
11-
from typing import Any, Dict, Optional
11+
from typing import Any, Dict, List, Optional
1212

13+
from posthog.ai.types import StreamingContentBlock, ToolInProgress
1314
from posthog.ai.utils import (
1415
call_llm_and_track_usage,
15-
get_model_params,
16-
merge_system_prompt,
17-
with_privacy_mode,
16+
merge_usage_stats,
17+
)
18+
from posthog.ai.anthropic.anthropic_converter import (
19+
extract_anthropic_usage_from_event,
20+
handle_anthropic_content_block_start,
21+
handle_anthropic_text_delta,
22+
handle_anthropic_tool_delta,
23+
finalize_anthropic_tool_input,
1824
)
1925
from posthog.ai.sanitization import sanitize_anthropic
2026
from posthog.client import Client as PostHogClient
@@ -62,6 +68,7 @@ def create(
6268
posthog_groups: Optional group analytics properties
6369
**kwargs: Arguments passed to Anthropic's messages.create
6470
"""
71+
6572
if posthog_trace_id is None:
6673
posthog_trace_id = str(uuid.uuid4())
6774

@@ -120,34 +127,65 @@ def _create_streaming(
120127
):
121128
start_time = time.time()
122129
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
123-
accumulated_content = []
130+
accumulated_content = ""
131+
content_blocks: List[StreamingContentBlock] = []
132+
tools_in_progress: Dict[str, ToolInProgress] = {}
133+
current_text_block: Optional[StreamingContentBlock] = None
124134
response = super().create(**kwargs)
125135

126136
def generator():
127137
nonlocal usage_stats
128-
nonlocal accumulated_content # noqa: F824
138+
nonlocal accumulated_content
139+
nonlocal content_blocks
140+
nonlocal tools_in_progress
141+
nonlocal current_text_block
142+
129143
try:
130144
for event in response:
131-
if hasattr(event, "usage") and event.usage:
132-
usage_stats = {
133-
k: getattr(event.usage, k, 0)
134-
for k in [
135-
"input_tokens",
136-
"output_tokens",
137-
"cache_read_input_tokens",
138-
"cache_creation_input_tokens",
139-
]
140-
}
141-
142-
if hasattr(event, "content") and event.content:
143-
accumulated_content.append(event.content)
145+
# Extract usage stats from event
146+
event_usage = extract_anthropic_usage_from_event(event)
147+
merge_usage_stats(usage_stats, event_usage)
148+
149+
# Handle content block start events
150+
if hasattr(event, "type") and event.type == "content_block_start":
151+
block, tool = handle_anthropic_content_block_start(event)
152+
153+
if block:
154+
content_blocks.append(block)
155+
156+
if block.get("type") == "text":
157+
current_text_block = block
158+
else:
159+
current_text_block = None
160+
161+
if tool:
162+
tool_id = tool["block"].get("id")
163+
if tool_id:
164+
tools_in_progress[tool_id] = tool
165+
166+
# Handle text delta events
167+
delta_text = handle_anthropic_text_delta(event, current_text_block)
168+
169+
if delta_text:
170+
accumulated_content += delta_text
171+
172+
# Handle tool input delta events
173+
handle_anthropic_tool_delta(
174+
event, content_blocks, tools_in_progress
175+
)
176+
177+
# Handle content block stop events
178+
if hasattr(event, "type") and event.type == "content_block_stop":
179+
current_text_block = None
180+
finalize_anthropic_tool_input(
181+
event, content_blocks, tools_in_progress
182+
)
144183

145184
yield event
146185

147186
finally:
148187
end_time = time.time()
149188
latency = end_time - start_time
150-
output = "".join(accumulated_content)
151189

152190
self._capture_streaming_event(
153191
posthog_distinct_id,
@@ -158,7 +196,8 @@ def generator():
158196
kwargs,
159197
usage_stats,
160198
latency,
161-
output,
199+
content_blocks,
200+
accumulated_content,
162201
)
163202

164203
return generator()
@@ -173,47 +212,38 @@ def _capture_streaming_event(
173212
kwargs: Dict[str, Any],
174213
usage_stats: Dict[str, int],
175214
latency: float,
176-
output: str,
215+
content_blocks: List[StreamingContentBlock],
216+
accumulated_content: str,
177217
):
178-
if posthog_trace_id is None:
179-
posthog_trace_id = str(uuid.uuid4())
180-
181-
event_properties = {
182-
"$ai_provider": "anthropic",
183-
"$ai_model": kwargs.get("model"),
184-
"$ai_model_parameters": get_model_params(kwargs),
185-
"$ai_input": with_privacy_mode(
186-
self._client._ph_client,
187-
posthog_privacy_mode,
188-
sanitize_anthropic(merge_system_prompt(kwargs, "anthropic")),
189-
),
190-
"$ai_output_choices": with_privacy_mode(
191-
self._client._ph_client,
192-
posthog_privacy_mode,
193-
[{"content": output, "role": "assistant"}],
194-
),
195-
"$ai_http_status": 200,
196-
"$ai_input_tokens": usage_stats.get("input_tokens", 0),
197-
"$ai_output_tokens": usage_stats.get("output_tokens", 0),
198-
"$ai_cache_read_input_tokens": usage_stats.get(
199-
"cache_read_input_tokens", 0
200-
),
201-
"$ai_cache_creation_input_tokens": usage_stats.get(
202-
"cache_creation_input_tokens", 0
218+
from posthog.ai.types import StreamingEventData
219+
from posthog.ai.anthropic.anthropic_converter import (
220+
standardize_anthropic_usage,
221+
format_anthropic_streaming_input,
222+
format_anthropic_streaming_output_complete,
223+
)
224+
from posthog.ai.utils import capture_streaming_event
225+
226+
# Prepare standardized event data
227+
formatted_input = format_anthropic_streaming_input(kwargs)
228+
sanitized_input = sanitize_anthropic(formatted_input)
229+
230+
event_data = StreamingEventData(
231+
provider="anthropic",
232+
model=kwargs.get("model", "unknown"),
233+
base_url=str(self._client.base_url),
234+
kwargs=kwargs,
235+
formatted_input=sanitized_input,
236+
formatted_output=format_anthropic_streaming_output_complete(
237+
content_blocks, accumulated_content
203238
),
204-
"$ai_latency": latency,
205-
"$ai_trace_id": posthog_trace_id,
206-
"$ai_base_url": str(self._client.base_url),
207-
**(posthog_properties or {}),
208-
}
209-
210-
if posthog_distinct_id is None:
211-
event_properties["$process_person_profile"] = False
212-
213-
if hasattr(self._client._ph_client, "capture"):
214-
self._client._ph_client.capture(
215-
distinct_id=posthog_distinct_id or posthog_trace_id,
216-
event="$ai_generation",
217-
properties=event_properties,
218-
groups=posthog_groups,
219-
)
239+
usage_stats=standardize_anthropic_usage(usage_stats),
240+
latency=latency,
241+
distinct_id=posthog_distinct_id,
242+
trace_id=posthog_trace_id,
243+
properties=posthog_properties,
244+
privacy_mode=posthog_privacy_mode,
245+
groups=posthog_groups,
246+
)
247+
248+
# Use the common capture function
249+
capture_streaming_event(self._client._ph_client, event_data)

0 commit comments

Comments
 (0)