Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 6.7.2 - 2025-09-03

- fix: tool call results in streaming providers

# 6.7.1 - 2025-09-01

- fix: Add base64 inline image sanitization
Expand Down
89 changes: 70 additions & 19 deletions mypy-baseline.txt

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions posthog/ai/anthropic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
AsyncAnthropicBedrock,
AsyncAnthropicVertex,
)
from .anthropic_converter import (
format_anthropic_response,
format_anthropic_input,
extract_anthropic_tools,
format_anthropic_streaming_content,
)

__all__ = [
"Anthropic",
Expand All @@ -14,4 +20,8 @@
"AsyncAnthropicBedrock",
"AnthropicVertex",
"AsyncAnthropicVertex",
"format_anthropic_response",
"format_anthropic_input",
"extract_anthropic_tools",
"format_anthropic_streaming_content",
]
156 changes: 93 additions & 63 deletions posthog/ai/anthropic/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@

import time
import uuid
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from posthog.ai.types import StreamingContentBlock, ToolInProgress
from posthog.ai.utils import (
call_llm_and_track_usage,
get_model_params,
merge_system_prompt,
with_privacy_mode,
merge_usage_stats,
)
from posthog.ai.anthropic.anthropic_converter import (
extract_anthropic_usage_from_event,
handle_anthropic_content_block_start,
handle_anthropic_text_delta,
handle_anthropic_tool_delta,
finalize_anthropic_tool_input,
)
from posthog.ai.sanitization import sanitize_anthropic
from posthog.client import Client as PostHogClient
Expand Down Expand Up @@ -62,6 +68,7 @@ def create(
posthog_groups: Optional group analytics properties
**kwargs: Arguments passed to Anthropic's messages.create
"""

if posthog_trace_id is None:
posthog_trace_id = str(uuid.uuid4())

Expand Down Expand Up @@ -120,34 +127,65 @@ def _create_streaming(
):
start_time = time.time()
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
accumulated_content = []
accumulated_content = ""
content_blocks: List[StreamingContentBlock] = []
tools_in_progress: Dict[str, ToolInProgress] = {}
current_text_block: Optional[StreamingContentBlock] = None
response = super().create(**kwargs)

def generator():
nonlocal usage_stats
nonlocal accumulated_content # noqa: F824
nonlocal accumulated_content
nonlocal content_blocks
nonlocal tools_in_progress
nonlocal current_text_block

try:
for event in response:
if hasattr(event, "usage") and event.usage:
usage_stats = {
k: getattr(event.usage, k, 0)
for k in [
"input_tokens",
"output_tokens",
"cache_read_input_tokens",
"cache_creation_input_tokens",
]
}

if hasattr(event, "content") and event.content:
accumulated_content.append(event.content)
# Extract usage stats from event
event_usage = extract_anthropic_usage_from_event(event)
merge_usage_stats(usage_stats, event_usage)

# Handle content block start events
if hasattr(event, "type") and event.type == "content_block_start":
block, tool = handle_anthropic_content_block_start(event)

if block:
content_blocks.append(block)

if block.get("type") == "text":
current_text_block = block
else:
current_text_block = None

if tool:
tool_id = tool["block"].get("id")
if tool_id:
tools_in_progress[tool_id] = tool

# Handle text delta events
delta_text = handle_anthropic_text_delta(event, current_text_block)

if delta_text:
accumulated_content += delta_text

# Handle tool input delta events
handle_anthropic_tool_delta(
event, content_blocks, tools_in_progress
)

# Handle content block stop events
if hasattr(event, "type") and event.type == "content_block_stop":
current_text_block = None
finalize_anthropic_tool_input(
event, content_blocks, tools_in_progress
)

yield event

finally:
end_time = time.time()
latency = end_time - start_time
output = "".join(accumulated_content)

self._capture_streaming_event(
posthog_distinct_id,
Expand All @@ -158,7 +196,8 @@ def generator():
kwargs,
usage_stats,
latency,
output,
content_blocks,
accumulated_content,
)

return generator()
Expand All @@ -173,47 +212,38 @@ def _capture_streaming_event(
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
latency: float,
output: str,
content_blocks: List[StreamingContentBlock],
accumulated_content: str,
):
if posthog_trace_id is None:
posthog_trace_id = str(uuid.uuid4())

event_properties = {
"$ai_provider": "anthropic",
"$ai_model": kwargs.get("model"),
"$ai_model_parameters": get_model_params(kwargs),
"$ai_input": with_privacy_mode(
self._client._ph_client,
posthog_privacy_mode,
sanitize_anthropic(merge_system_prompt(kwargs, "anthropic")),
),
"$ai_output_choices": with_privacy_mode(
self._client._ph_client,
posthog_privacy_mode,
[{"content": output, "role": "assistant"}],
),
"$ai_http_status": 200,
"$ai_input_tokens": usage_stats.get("input_tokens", 0),
"$ai_output_tokens": usage_stats.get("output_tokens", 0),
"$ai_cache_read_input_tokens": usage_stats.get(
"cache_read_input_tokens", 0
),
"$ai_cache_creation_input_tokens": usage_stats.get(
"cache_creation_input_tokens", 0
from posthog.ai.types import StreamingEventData
from posthog.ai.anthropic.anthropic_converter import (
standardize_anthropic_usage,
format_anthropic_streaming_input,
format_anthropic_streaming_output_complete,
)
from posthog.ai.utils import capture_streaming_event

# Prepare standardized event data
formatted_input = format_anthropic_streaming_input(kwargs)
sanitized_input = sanitize_anthropic(formatted_input)

event_data = StreamingEventData(
provider="anthropic",
model=kwargs.get("model", "unknown"),
base_url=str(self._client.base_url),
kwargs=kwargs,
formatted_input=sanitized_input,
formatted_output=format_anthropic_streaming_output_complete(
content_blocks, accumulated_content
),
"$ai_latency": latency,
"$ai_trace_id": posthog_trace_id,
"$ai_base_url": str(self._client.base_url),
**(posthog_properties or {}),
}

if posthog_distinct_id is None:
event_properties["$process_person_profile"] = False

if hasattr(self._client._ph_client, "capture"):
self._client._ph_client.capture(
distinct_id=posthog_distinct_id or posthog_trace_id,
event="$ai_generation",
properties=event_properties,
groups=posthog_groups,
)
usage_stats=standardize_anthropic_usage(usage_stats),
latency=latency,
distinct_id=posthog_distinct_id,
trace_id=posthog_trace_id,
properties=posthog_properties,
privacy_mode=posthog_privacy_mode,
groups=posthog_groups,
)

# Use the common capture function
capture_streaming_event(self._client._ph_client, event_data)
Loading
Loading